Flink入门程序
in Flink with 0 comment

Flink入门程序

in Flink with 0 comment

Flink环境准备

开发工具及环境要求

首先我们需要在环境中搭建 Flink 运行环境,总共可以分为下面这几步:

IDE最好使用IntelliJ IDEA (eclipse存在插件不兼容的风险)

唯一的要求是使用 Maven3.0.4 和安装 Java 8.x(或更高版本)。

Maven依赖坐标

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <version>1.9.0</version>
  </dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>1.9.0</version>
  <scope>provided</scope>
</dependency>

Flink入门程序

双击桌面的 IDEA 程序,启动之后,点击 File -> New -> Project 创建一个新的 Maven 工程 FlinkLearning:

image-1655711464446

image-1655711469990

创建好之后点击左上角 File > Settings 中,将 Maven 的配置文件修改为 /usr/share/maven/conf/settings.xml,配置之后的 Maven 中心仓库为阿里云,加载依赖会快很多:

image-1655711486073

接下来在工程的总目录 FlinkLearning 右键点击 Add Framework Support,为工程勾选添加 Scala 支持:

image-1655711492991

在工程 src/main 目录中创建 scala 文件夹,然后右键,选择 Mark Directory as,并将其标记为 Sources Root。

image-1655711501167

在 scala 目录里创建 com.shiyanlou.wc 包,并分别创建 BatchWordCount 和 StreamWordCount 两个 Scala Object,分别代表 Flink 批处理和 Flink 流处理。

image-1655711509478

至此,我们的准备工作已经完成,接下来正式进入编码阶段。

Flink批处理

package cn.tedu.flinktest;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWC {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.读取源文件
        DataSource<String> textFile = env.readTextFile("E:\\words.txt");
        //3.读取当前行,并进行切割
        textFile.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] values = s.toLowerCase().split(" ");
                for (String value: values){
                    if (value.length()>0){
                        collector.collect(new Tuple2<String, Integer>(value,1));
                    }
                }
            }
        })
                //4.分组
                .groupBy(0)
                //5.求和
                .sum(1)
                //6.打印(sink)
                .print();
    }
}

Flink流式处理

package cn.tedu.flinktest;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamWC {
    public static void main(String[] args) throws Exception {
        //1.从args中获取端口等参数
        int port = 0;
        try{
            ParameterTool tool = ParameterTool.fromArgs(args);
            port = tool.getInt("port");
        }catch(Exception e){
            System.err.println("未定义端口,使用默认9999");
           port = 9999;
        }
        //2.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //3.获取数据源
        DataStreamSource<String> source = env.socketTextStream("localhost", port);
        //4.对每行数据进行切割
        source.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] values = s.toLowerCase().split(" ");
                for (String value: values){
                    if (value.length()>0){
                        collector.collect(new Tuple2<String, Integer>(value,1));
                    }
                }
            }
        })
                //5.分组
                .keyBy(0)
                //6.定时刷新结果
                .timeWindow(Time.seconds(5))
                //7.求和
                .sum(1)
                //8.打印(sink)
                .print()
                //9.设置并行数
                .setParallelism(1);
        //10.提交作业
        env.execute("StreamWC");
    }
}

总结开发过程

Flink程序不管是批处理还是流式数据处理看起来都像转换数据集合的常规程序。每个程序都包含相同的基本部分:

获得execution environment

有三种方式:

getExecutionEnvironment()
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles) //(不常用)

加载/创建初始数据source

批处理:DataStream text = env.readTextFile(“file:///path/to/file”);

流式处理: DataStreamSource source = env.socketTextStream(“localhost”, port);

等等(详见source章节)

指定对此数据的转换transformation

根据业务需求编写处理逻辑。此部分为后续重点讲解内容。

指定将计算结果放在何处sink

writeAsText(String path)
print()

触发程序执行execute

Flink不会直接将部分代码逐行执行,而是采用懒加载方式,只有最终调用execute()方法时才会执行相关作业,这样能够保证flink可以胜任更加复杂的应用程序。

env.execute()