Flink流处理WordCount案例实现

package com.ibacon.flink.wc;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhangxin
 * @Description 无界流处理wc
 * @createTime 2023年01月03日 14:06:00
 */
public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //传外部参数的形式
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String host = parameterTool.get("host");
        Integer port = parameterTool.getInt("port");


        DataStreamSource<String> streamSource = env.socketTextStream(host,port);


        streamSource.flatMap((FlatMapFunction<String, Tuple2<String, Long>>) (value, out) -> {
            String[] words = value.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word,1L));
            }
        })
                .returns(Types.TUPLE(Types.STRING,Types.LONG))    //指明类型,解决泛型擦除问题
                .keyBy(0)
                .sum(1)
                .print();

        env.execute();

    }
}

部署模式及集群模式

部署模式分为会话模式、单作业模式、应用模式三种,通常用前两种

集群模式分为独立集群、Yarn集群、K8S模式等,独立基本不用,K8S没学过,所以这里重点写Yarn集群

Yarn集群模式下也三种部署模式,这里只记录前两种,应用模式只是命令改个名字而已

hadoop的环境变量要加上export HADOOP_CLASSPATH=`hadoop classpath` 这一行,但是会出现一个问题,使用myhadoop.sh脚本启停以及xsync脚本分发环境变量都会报:找不到hadoop命令这个提示

解决办法:

思路1:将`hadoop classpath`前加上绝对路径即可(已测试成功)

export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`

或者(未测试,仅为思路):

export HADOOP_CLASSPATH=$(hadoop classpath)

思路2:未刷新 source /etc/profile或者没有重连客户端

image-20230103175126074

可以看到这时再开启Yarn会话初始化集群就成功了:

image-20230103180126730

image-20230103180205488

点击任务id进去后,点击ApplicationMaster:

image-20230103180315677

即可看到初始化的Flink集群:

image-20230103180356018

因为是yarn模式 所以资源都是提交任务之后再分配 而不是独立模式那样提前指定好

向Yarn-Session提交作业(自己编写的Flink工程已打包至虚拟机的flink安装目录下),需要单独开一个hadoop102的窗口启动nc -lk 8888

bin/flink run -c com.ibacon.flink.wc.StreamWordCount ./Flink-1.0-SNAPSHOT.jar --host hadoop102 --port 8888 

此时没有-p参数指定并行度,默认并行度是1,效果如下:

image-20230103181340969

可以看到yarn会话模式下不允许用户自己上传任务jar包:

image-20230103181755099

停止任务,直接上ui界面cancel(懒得用命令行),cancel掉之后 申请下来的资源暂时还在:

image-20230103184451922

命令行方式:

bin/flink cancel JobID

JobID:

image-20230103194757249

资源过一会就被释放掉了:

image-20230103184544472

查看任务列表:

bin/flink list

image-20230103195146725

kill掉会话任务:

yarn application -kill application_1672739386736_0001

image-20230103202618272

image-20230103202639238

查看历史命令技巧

重新提交测试任务,偷懒用

history | grep flink

image-20230103184939386

一行命令搞定:

bin/flink run  -d -t yarn-per-job -c com.ibacon.flink.wc.StreamWordCount -p 2 ./Flink-1.0-SNAPSHOT.jar --host hadoop102 --port 8888

image-20230103195630437

查看作业:bin/flink list命令 默认找会话模式的任务,需要指定以下参数:

bin/flink list -t yarn-per-job -Dyarn.application.id=application_1672739386736_0002

image-20230103200324846

取消作业:

bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1672739386736_0002 <jobId>

bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_1672739386736_0002 49297394b9e359548a2ba34d5da58c84

image-20230103200709145

可以看到该yarn应用状态的改变,由此说明yarn集群模式下的单应用模式,就是为了提交的作业而生,作业任务一旦停止,该yarn应用也就殉情了(bushi)

image-20230103200803315

运行时架构

Flink的作业提交和任务处理系统示意图

image-20230104195030660

作业提交流程(抽象)

image-20230104195113823

(1) 一般情况下,由客户端(App)通过分发器提供的REST接口,将作业提交给JobManager。

(2)由分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster。

(3)JobMaster将JobGraph解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)。

(4)资源管理器判断当前是否由足够的可用资源;如果没有,启动新的TaskManager。

(5)TaskManager启动之后,向ResourceManager注册自己的可用任务槽(slots)。

(6)资源管理器通知TaskManager为新的作业提供slots。

(7)TaskManager连接到对应的JobMaster,提供slots。

(8)JobMaster将需要执行的任务分发给TaskManager。

(9)TaskManager执行任务,互相之间可以交换数据。

独立模式作业提交流程

image-20230104195556970

4.不会启动TaskManager ,因为独立模式下,这玩意是指定好了的,如果资源够,直接向它要资源

YARN集群-会话模式

1.启动一个会话,创建一个Flink集群的空壳子,此时没有提交作业,只有ResourceManager和Dispatcher在运行

image-20230104200118802

2.根据需要,向Yarn申请资源,动态启动TaskManager给予作业运行所需的资源

image-20230104200225035

YARN集群-单作业模式

不预先启动Flink集群,提交作业才启动JobManager,省去了Dispatcher

image-20230104200447608

重要概念

数据流图、并行度、算子链、作业图与执行图、任务与任务槽

清楚:

  • 并行度的四种设置方法
  • 算子链合并的条件
  • 任务槽共享原理

DataStream API

源算子

提前准备一个POJO类模拟用户行为(必须符合Flink要求,不然会被视为泛型类型,当作黑盒)

Flink对POJO类型的要求如下:

  • 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
  • 类有一个公共的无参构造方法;
  • 类中的所有字段是public且非final的;或者有一个公共的getter和setter方法,这些方法需要符合Java bean的命名规范。
package com.ibacon.flink.api.pojo;

/**
 * @author zhangxin
 * @Description 用户行为模拟对象
 * @createTime 2023年01月04日 15:36:00
 */
public class MyEvent {

    public String user;
    public String url;
    public Long ts;

    public MyEvent() {
    }

    public MyEvent(String user, String url, Long ts) {
        this.user = user;
        this.url = url;
        this.ts = ts;
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", ts=" + ts +
                '}';
    }
}

自定义Source

package com.ibacon.flink.api.source;

import com.ibacon.flink.api.pojo.MyEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

/**
 * @author zhangxin
 * @Description 自定义数据源
 * @createTime 2023年01月04日 15:34:00
 */
public class MySource implements SourceFunction<MyEvent> {
    private Boolean running = true;

    @Override
    public void run(SourceContext<MyEvent> sourceContext) throws Exception {
        Random random = new Random();
        String[] users = {"Mary", "Bob", "ibacon"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};
        while (running) {
            sourceContext.collect(
                    new MyEvent(
                            users[random.nextInt(users.length)],
                            urls[random.nextInt(urls.length)],
                            Calendar.getInstance().getTimeInMillis()
                    )
            );

            Thread.sleep(500);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

测试:

package com.ibacon.flink.api.source;

import com.ibacon.flink.api.pojo.MyEvent;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhangxin
 * @Description 自定义数据源测试
 * @createTime 2023年01月04日 15:43:00
 */
public class MySourceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<MyEvent> streamSource = env.addSource(new MySource());
        streamSource.print("MySource");
        env.execute();
    }
}

image-20230104201440595

KafkaSource:

先加入flink官方提供的kafka连接器依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

编写测试类

package com.ibacon.flink.api.source;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

/**
 * @author zhangxin
 * @Description kafka数据源
 * @createTime 2023年01月04日 15:10:00
 */
public class KafkaSourceTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "hadoop102:9092");

        DataStreamSource<String> kafkaStream = env.addSource(
                new FlinkKafkaConsumer<>("topic_log", new SimpleStringSchema(), prop)
        );

        kafkaStream.print("kafka");

        env.execute();
    }
}

很明显需要启动一个kafka生产者以供测试(zk,kafka进程)

kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic topic_log

启动测试类,观察有没有消费到数据

image-20230104201954568

转换算子

基本转换算子

package com.ibacon.flink.api.transformation;

import com.ibacon.flink.api.pojo.MyEvent;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author zhangxin
 * @Description map、filter、flatmap 基本转换算子
 * @createTime 2023年01月04日 17:24:00
 */
public class BasicOperator {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<MyEvent> stream = env.fromElements(
                new MyEvent("Mary", "./home", 1000L),
                new MyEvent("Bob", "./cart", 2000L)
        );

        SingleOutputStreamOperator<String> stream1 = stream.map(x -> x.user)
                .filter(y -> y.equals("Bob"));


        stream1.print("map->filter->");

        SingleOutputStreamOperator<String> stream2 = stream.flatMap(
                (MyEvent value, Collector<String> out) -> {
            if (value.user.equals("Mary")) {
                out.collect(value.user);
            } else if (value.user.equals("Bob")) {
                out.collect(value.user);
                out.collect(value.url);
            }
        }).returns(Types.STRING);

        stream2.print("flatmap->");

        env.execute();

    }
}

map+filter的功能 flatmap一个人就能搞定

image-20230104202415717

聚合算子

为了说明keyBy的效果(按指定的键算出hashcode,对并行度取模),设置并行度为4

按键分区keyBy

package com.ibacon.flink.api.transformation;

import com.ibacon.flink.api.pojo.MyEvent;
import com.ibacon.flink.api.source.MySource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhangxin
 * @Description 简单聚合算子
 * @createTime 2023年01月04日 18:07:00
 */
public class AggregationOperator {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStreamSource<MyEvent> streamSource = env.addSource(new MySource());

//        streamSource.keyBy(x -> x.user).maxBy("ts").print("maxBy--");
//        streamSource.keyBy(x -> x.user).max("ts").print("max--");
//        streamSource.keyBy(x -> x.user).minBy("ts").print("minBy--");
//        streamSource.keyBy(x -> x.user).min("ts").print("min--");
        streamSource.keyBy(x -> x.user).print("keyBy--");


        env.execute();


    }
}

image-20230104202752962

简单聚合

以下一次测试max maxBy min minBy,并说明区别

简单聚合算子中,maxBy和max的区别:

max只取一个字段的最大值,其他字段取跟第一次一样的值,maxBy取该字段最大值所在的一整行的值

min和minBy区别类似

如图所示

image-20230104203420487

归约聚合reduce

一个需求:统计每个用户最近一次访问时间戳,以及之前访问过的所有路径

package com.ibacon.flink.api.transformation;

import com.ibacon.flink.api.pojo.MyEvent;
import com.ibacon.flink.api.source.MySource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhangxin
 * @Description 归约聚合
 * @createTime 2023年01月04日 18:24:00
 */
public class ReduceOperator {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<MyEvent> streamSource = env.addSource(new MySource());

        //统计每个用户最近一次访问时间戳,以及之前访问过的所有路径
        streamSource.keyBy(x->x.user)
                .reduce(
                        (val1,val2)-> new MyEvent(val1.user,val1.url+","+val2.url, val2.ts)
                )
                .print();
        env.execute();
    }
}

image-20230104204227313

用户自定义函数

还没学,但是UDF函数基本都差不多

物理分区算子

不算严格意义上的算子,只是官方这么翻译

输出算子

Q.E.D.