Flink流处理WordCount案例实现
package com.ibacon.flink.wc;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author zhangxin
* @Description 无界流处理wc
* @createTime 2023年01月03日 14:06:00
*/
public class StreamWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//传外部参数的形式
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
Integer port = parameterTool.getInt("port");
DataStreamSource<String> streamSource = env.socketTextStream(host,port);
streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word,1L));
}
})
.returns(Types.TUPLE(Types.STRING,Types.LONG)) //指明类型,解决泛型擦除问题
.keyBy(0)
.sum(1)
.print();
env.execute();
}
}
部署模式及集群模式
部署模式分为会话模式、单作业模式、应用模式三种,通常用前两种
集群模式分为独立集群、Yarn集群、K8S模式等,独立基本不用,K8S没学过,所以这里重点写Yarn集群
Yarn集群模式下也三种部署模式,这里只记录前两种,应用模式只是命令改个名字而已
Flink on YARN集群会话模式部署的环境准备及测试
hadoop的环境变量要加上export HADOOP_CLASSPATH=`hadoop classpath`
这一行,但是会出现一个问题,使用myhadoop.sh脚本启停以及xsync脚本分发环境变量都会报:找不到hadoop命令这个提示
解决办法:
思路1:将`hadoop classpath`前加上绝对路径即可(已测试成功)
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
或者(未测试,仅为思路):
export HADOOP_CLASSPATH=$(hadoop classpath)
思路2:未刷新 source /etc/profile
或者没有重连客户端
可以看到这时再开启Yarn会话初始化集群就成功了:
点击任务id进去后,点击ApplicationMaster:
即可看到初始化的Flink集群:
因为是yarn模式 所以资源都是提交任务之后再分配 而不是独立模式那样提前指定好
向Yarn-Session提交作业(自己编写的Flink工程已打包至虚拟机的flink安装目录下),需要单独开一个hadoop102的窗口启动nc -lk 8888
bin/flink run -c com.ibacon.flink.wc.StreamWordCount ./Flink-1.0-SNAPSHOT.jar --host hadoop102 --port 8888
此时没有-p参数指定并行度,默认并行度是1,效果如下:
可以看到yarn会话模式下不允许用户自己上传任务jar包:
停止任务,直接上ui界面cancel(懒得用命令行),cancel掉之后 申请下来的资源暂时还在:
命令行方式:
bin/flink cancel JobID
JobID:
资源过一会就被释放掉了:
查看任务列表:
bin/flink list
kill掉会话任务:
yarn application -kill application_1672739386736_0001
查看历史命令技巧
重新提交测试任务,偷懒用
history | grep flink
Flink on YARN集群单作业模式部署的测试
一行命令搞定:
bin/flink run -d -t yarn-per-job -c com.ibacon.flink.wc.StreamWordCount -p 2 ./Flink-1.0-SNAPSHOT.jar --host hadoop102 --port 8888
查看作业:bin/flink list命令 默认找会话模式的任务,需要指定以下参数:
bin/flink list -t yarn-per-job -Dyarn.application.id=application_1672739386736_0002
取消作业:
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1672739386736_0002 <jobId>
bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1672739386736_0002 49297394b9e359548a2ba34d5da58c84
可以看到该yarn应用状态的改变,由此说明yarn集群模式下的单应用模式,就是为了提交的作业而生,作业任务一旦停止,该yarn应用也就殉情了(bushi)
运行时架构
Flink的作业提交和任务处理系统示意图:
作业提交流程(抽象):
(1) 一般情况下,由客户端(App)通过分发器提供的REST接口,将作业提交给JobManager。
(2)由分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster。
(3)JobMaster将JobGraph解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)。
(4)资源管理器判断当前是否由足够的可用资源;如果没有,启动新的TaskManager。
(5)TaskManager启动之后,向ResourceManager注册自己的可用任务槽(slots)。
(6)资源管理器通知TaskManager为新的作业提供slots。
(7)TaskManager连接到对应的JobMaster,提供slots。
(8)JobMaster将需要执行的任务分发给TaskManager。
(9)TaskManager执行任务,互相之间可以交换数据。
独立模式作业提交流程:
4.不会启动TaskManager ,因为独立模式下,这玩意是指定好了的,如果资源够,直接向它要资源
YARN集群-会话模式
1.启动一个会话,创建一个Flink集群的空壳子,此时没有提交作业,只有ResourceManager和Dispatcher在运行
2.根据需要,向Yarn申请资源,动态启动TaskManager给予作业运行所需的资源
YARN集群-单作业模式
不预先启动Flink集群,提交作业才启动JobManager,省去了Dispatcher
重要概念
数据流图、并行度、算子链、作业图与执行图、任务与任务槽
清楚:
- 并行度的四种设置方法
- 算子链合并的条件
- 任务槽共享原理
DataStream API
源算子
提前准备一个POJO类模拟用户行为(必须符合Flink要求,不然会被视为泛型类型,当作黑盒)
Flink对POJO类型的要求如下:
- 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
- 类有一个公共的无参构造方法;
- 类中的所有字段是public且非final的;或者有一个公共的getter和setter方法,这些方法需要符合Java bean的命名规范。
package com.ibacon.flink.api.pojo;
/**
* @author zhangxin
* @Description 用户行为模拟对象
* @createTime 2023年01月04日 15:36:00
*/
public class MyEvent {
public String user;
public String url;
public Long ts;
public MyEvent() {
}
public MyEvent(String user, String url, Long ts) {
this.user = user;
this.url = url;
this.ts = ts;
}
@Override
public String toString() {
return "MyEvent{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", ts=" + ts +
'}';
}
}
自定义Source:
package com.ibacon.flink.api.source;
import com.ibacon.flink.api.pojo.MyEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
/**
* @author zhangxin
* @Description 自定义数据源
* @createTime 2023年01月04日 15:34:00
*/
public class MySource implements SourceFunction<MyEvent> {
private Boolean running = true;
@Override
public void run(SourceContext<MyEvent> sourceContext) throws Exception {
Random random = new Random();
String[] users = {"Mary", "Bob", "ibacon"};
String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
while (running) {
sourceContext.collect(
new MyEvent(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
)
);
Thread.sleep(500);
}
}
@Override
public void cancel() {
running = false;
}
}
测试:
package com.ibacon.flink.api.source;
import com.ibacon.flink.api.pojo.MyEvent;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author zhangxin
* @Description 自定义数据源测试
* @createTime 2023年01月04日 15:43:00
*/
public class MySourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<MyEvent> streamSource = env.addSource(new MySource());
streamSource.print("MySource");
env.execute();
}
}
KafkaSource:
先加入flink官方提供的kafka连接器依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
编写测试类
package com.ibacon.flink.api.source;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
/**
* @author zhangxin
* @Description kafka数据源
* @createTime 2023年01月04日 15:10:00
*/
public class KafkaSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "hadoop102:9092");
DataStreamSource<String> kafkaStream = env.addSource(
new FlinkKafkaConsumer<>("topic_log", new SimpleStringSchema(), prop)
);
kafkaStream.print("kafka");
env.execute();
}
}
很明显需要启动一个kafka生产者以供测试(zk,kafka进程)
kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic topic_log
启动测试类,观察有没有消费到数据
转换算子
基本转换算子
package com.ibacon.flink.api.transformation;
import com.ibacon.flink.api.pojo.MyEvent;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @author zhangxin
* @Description map、filter、flatmap 基本转换算子
* @createTime 2023年01月04日 17:24:00
*/
public class BasicOperator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<MyEvent> stream = env.fromElements(
new MyEvent("Mary", "./home", 1000L),
new MyEvent("Bob", "./cart", 2000L)
);
SingleOutputStreamOperator<String> stream1 = stream.map(x -> x.user)
.filter(y -> y.equals("Bob"));
stream1.print("map->filter->");
SingleOutputStreamOperator<String> stream2 = stream.flatMap(
(MyEvent value, Collector<String> out) -> {
if (value.user.equals("Mary")) {
out.collect(value.user);
} else if (value.user.equals("Bob")) {
out.collect(value.user);
out.collect(value.url);
}
}).returns(Types.STRING);
stream2.print("flatmap->");
env.execute();
}
}
map+filter的功能 flatmap一个人就能搞定
聚合算子
为了说明keyBy的效果(按指定的键算出hashcode,对并行度取模),设置并行度为4
按键分区keyBy:
package com.ibacon.flink.api.transformation;
import com.ibacon.flink.api.pojo.MyEvent;
import com.ibacon.flink.api.source.MySource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author zhangxin
* @Description 简单聚合算子
* @createTime 2023年01月04日 18:07:00
*/
public class AggregationOperator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
DataStreamSource<MyEvent> streamSource = env.addSource(new MySource());
// streamSource.keyBy(x -> x.user).maxBy("ts").print("maxBy--");
// streamSource.keyBy(x -> x.user).max("ts").print("max--");
// streamSource.keyBy(x -> x.user).minBy("ts").print("minBy--");
// streamSource.keyBy(x -> x.user).min("ts").print("min--");
streamSource.keyBy(x -> x.user).print("keyBy--");
env.execute();
}
}
简单聚合:
以下一次测试max maxBy min minBy,并说明区别
简单聚合算子中,maxBy和max的区别:
max只取一个字段的最大值,其他字段取跟第一次一样的值,maxBy取该字段最大值所在的一整行的值
min和minBy区别类似
如图所示
归约聚合reduce:
一个需求:统计每个用户最近一次访问时间戳,以及之前访问过的所有路径
package com.ibacon.flink.api.transformation;
import com.ibacon.flink.api.pojo.MyEvent;
import com.ibacon.flink.api.source.MySource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author zhangxin
* @Description 归约聚合
* @createTime 2023年01月04日 18:24:00
*/
public class ReduceOperator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<MyEvent> streamSource = env.addSource(new MySource());
//统计每个用户最近一次访问时间戳,以及之前访问过的所有路径
streamSource.keyBy(x->x.user)
.reduce(
(val1,val2)-> new MyEvent(val1.user,val1.url+","+val2.url, val2.ts)
)
.print();
env.execute();
}
}
用户自定义函数
还没学,但是UDF函数基本都差不多
物理分区算子
不算严格意义上的算子,只是官方这么翻译
输出算子
Q.E.D.