基于Flink,Kafka实现海量数据的实时处理
in Flink with 0 comment

基于Flink,Kafka实现海量数据的实时处理

in Flink with 0 comment

实时分析

Kafka简介

他是一个MQ,作用:削峰平谷

ActiveQM(6k)、RabbitMQ(1.2w) 适用与业务系统(对事务要求极高)

ZeroMQ(25-50w)、Kafka(25-50w) 适用于大数据(对速度和吞吐量要求高,海量数据中,数据可能丢失,但对结果影响微乎其微)

Kafka为发布/订阅模式。

Kafka已经成为大数据业界主流。

Kafka重要概念

发布订阅: 支持多消费者同时消费数据,数据被长时间保存,并不会因为被消费而删除,这样就能够保证所有数据可以被重复消费.多用户同时读取同一个数据互不影响.数据会以时间维度来保存.规定保质期,保质期内可以反复消费,一旦保质期过去数据将会被清除.

Kafka集群: 可以单击运行也可以支持分布式运行.可以构建在所有基础上: 本地,云,容器,虚拟机…

Broker: 指的是Kafka集群中的每一个节点的名字.一般从0开始排列.要求名字必须是正整数,eg: 0 1 2 3 4 5…

Topic: 主题,是用于保证多个业务需求在运行中,数据不会发生交叉,保证每个业务中的数据不会被其他业务的消费者消费.同时生产者也不会错误的把自己的数据发送到其他业务的队列中.业务之间互相隔离.

Partition: 在Topic内部还会对数据进行进一步的划分,如果数据还有更细粒度的分类比如,jt(cart , order, search…多个细分类) 此时可以由这些不同的键将数据发送到不同的分区中.实现细分类之间的数据也互不影响.这样的设计也使kafka实现了可伸缩的特性,单个业务的增加或者减少不会影响到其他的服务.如果没有细分类.可以以分桶的方式将数据分别存放,以达到负载均衡的目的.是kafka高吞吐的保障.

leader\follower: 在分区内部kafka会以用户指定的量对分区进行备份,默认备份3份.其中原本的分区为leader 复制出的分区文件为follower,在运行过程中,leader负责数据的读写,follower只负责同步leader的数据,当leader所在的broker宕机,两个follower在zk的监督下选举,选出新的leader保证高可用.

offset: 在消费者客户端保存,用来记录消费者组的消费进度.保证消费不会重复.offset是可以人为控制的.可以重置为0(将所有现有的数据重新消费一遍) 可以设置为len(只消费以后出现的最新数据) 甚至可以修改offset更新的步长,比如每两个数据消费一次.

consumergroup: 在kafka消费者端,所有的消费者都会被定义为消费者组内的成员.如果消费者需要消费所有数据,那么自成一组,如果多个消费者负载均衡合作消费同一份数据,那么就需要将这些消费者编入同一个消费者组.消费者客户端以消费者组的维度保存并更新offset.保证消费不会重复.

Flink简介

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。

先来看一下Flink与同类产品的对比.

img

Flink详解

Flink的组成

img

JobManager:协调分布式执行。他们安排任务,协调检查点,协调故障恢复等。

总是至少有一个工作经理。高可用性设置将有多个JobManager,其中一个始终是领导者,而其他则是待机者。

TaskManagers:执行任务(或者更具体地说,子任务)的数据流,以及缓冲器和交换数据流。必须始终至少有一个TaskManager。

Flink抽象级别

img

Flink程序的基本构成

Source:数据源

Transformation:转化

Sink:输出

时间

在flink中定义了对数据描述的三种时间策略

Event time:事件时间,是数据产生时间

Ingestion time:接收时间,是数据接入source的瞬时时间

Processing time:处理时间,是每个执行基于时间的操作的机器节点的本地时间。

窗口

Time window:根据时间设定窗口大小

Tumbling window:滚动窗口

Sliding window:滑动窗口

Count window:根据数据条数设定窗口大小

Flink的优势

Flink的具体优势有以下几点:

同时支持高吞吐、低延迟、高性能

Flink是目前开源社区中唯一一套集高吞吐、低延迟、高性能三者于一身的分布式流式数据处理框架。像Apache Spark也只能兼顾高吞吐和高性能特性,主要因为在Spark Streaming流式计算中无法做到低延迟保障;而流式计算框架Apache Storm只能支持低延迟和高性能特性,但是无法满足高吞吐的要求。而满足高吞吐、低延迟、高性能这三个目标对分布式流式计算框架来说是非常重要的。

支持事件时间(Event Time)概念

在流式计算领域中,窗口计算的地位举足轻重,但目前大多数框架窗口计算采用的都是系统时间(Process Time),也是事件传输到计算框架处理时,系统主机的当前时间。Flink能够支持基于事件时间(Event Time)语义进行窗口计算,也就是使用事件产生的时间,这种基于事件驱动的机制使得事件即使乱序到达,流系统也能够计算出精确的结果,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。

支持有状态计算

Flink在1.4版本中实现了状态管理,所谓状态就是在流式计算过程中将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后可以从之前的状态中获取中间结果中计算当前的结果,从而无须每次都基于全部的原始数据来统计结果,这种方式极大地提升了系统的性能,并降低了数据计算过程的资源消耗。对于数据量大且运算逻辑非常复杂的流式计算场景,有状态计算发挥了非常重要的作用。

支持高度灵活的窗口(windows)操作

在流处理应用中,数据是连续不断的,需要通过窗口的方式对流数据进行一定范围的聚合计算,例如统计在过去的1分钟内有多少用户点击某一网页,在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行再计算。Flink将窗口划分为基于Time、Count、Session,以及Data-driven等类型的窗口操作,窗口可以用灵活的触发条件定制化来达到对复杂的流传输模式的支持,用户可以定义不同的窗口触发机制来满足不同的需求。

基于轻量级分布式快照(Snapshot)实现的容错

Flink能够分布式运行在上千个节点上,将一个大型计算任务的流程拆解成小的计算过程,然后将tesk分布到并行节点上进行处理。在任务执行过程中,能够自动发现事件处理过程中的错误而导致数据不一致的问题,比如:节点宕机、网路传输问题,或是由于用户因为升级或修复问题而导致计算服务重启等。在这些情况下,通过基于分布式快照技术的Checkpoints,将执行过程中的状态信息进行持久化存储,一旦任务出现异常停止,Flink就能够从Checkpoints中进行任务的自动恢复,以确保数据在处理过程中的一致性。

基于JVM实现独立的内存管理

内存管理是所有计算框架需要重点考虑的部分,尤其对于计算量比较大的计算场景,数据在内存中该如何进行管理显得至关重要。针对内存管理,Flink实现了自身管理内存的机制,尽可能减少JVM GC对系统的影响。另外,Flink通过序列化/反序列化方法将所有的数据对象转换成二进制在内存中存储,降低数据存储的大小的同时,能够更加有效地对内存空间进行利用,降低GC带来的性能下降或任务异常的风险,因此Flink较其他分布式处理的框架会显得更加稳定,不会因为JVM GC等问题而影响整个应用的运行。

Save Points(保存点)

对于7*24小时运行的流式应用,数据源源不断地接入,在一段时间内应用的终止有可能导致数据的丢失或者计算结果的不准确,例如进行集群版本的升级、停机运维操作等操作。值得一提的是,Flink通过Save Points技术将任务执行的快照保存在存储介质上,当任务重启的时候可以直接从事先保存的Save Points恢复原有的计算状态,使得任务继续按照停机之前的状态运行,Save Points技术可以让用户更好地管理和运维实时流式应用。

zookeeper的安装

安装ZooKeeper

/home/app目录下上传解压包并解压

修改配置文件zoo.cfg,注意:以下中文不要粘到配置文件中,只是为了对照学习方便。

tickTime=2000                       
#tickTime心跳时间,
clientPort=2181                     
#访问端口
dataDir=/home/app/zookeeper-3.4.8/tmp       
#设置日志路径
server.1=hadoop01:2888:3888      
#集群最少3个节点,可按机器名
server.2=hadoop02:2888:3888      
#2888指follower连leader端口
server.3=hadoop03:2888:3888      
#3888指定选举的端口

根目录下创建tmp文件夹,并在其中创建文件myid编辑1,2,3
操作命令,均在bin目录下执行

./zkServer.sh start          
#启动ZK服务
./zkServer.sh start-foreground   
#日志启动方式
./zkServer.sh stop           
#停止ZK服务
./zkServer.sh restart        
#重启ZK服务
./zkServer.sh status         
#查看ZK状态

启动zookeeper

在hadoop01 02 03 三台节点上分别执行

./zkServer.sh start

执行下边命令检测zk集群是否启动成功

./zkServer.sh status

img

此时,zk启动成功

Kafka安装

创建目录

上传解压

修改配置文件

cd /home/app/kafka_2.13-2.4.1/config
vi server.properties
broker.id=0 #当前server编号
log.dirs=/home/app/kafka_2.13-2.4.1/tmp/kafka-logs #日志存储目录
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181

启动

cd /home/app/kafka_2.13-2.4.1
bin/kafka-server-start.sh config/server.properties &

img

创建主题

创建主题flux

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flux

参数说明:

–create: 要创建主题

–replication-factor:副本个数

–partitions:主题分区存储,分区的个数

–topic:主题名(唯一,不能重复)

可以使用 --list 命令查看所有topic

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flux

消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flux --from-beginning

参数说明:

–from-beginning 是表示从头开始消费消息

If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.

消息已经永久保存在kafka集群了,从头消费或是从当前时间消费都可以。

Flume整合kafka

修改flume配置文件

flume的conf目录下执行

vi flume.properties
#命名Agent a1的组件
a1.sources  =  r1
a1.sinks  =  k1 k2
a1.channels  =  c1 c2
#描述/配置Source
a1.sources.r1.type  =  avro
a1.sources.r1.bind  =  0.0.0.0
a1.sources.r1.port  =  22222
a1.sources.r1.interceptors = t1
a1.sources.r1.interceptors.t1.type = timestamp
#描述Sink
a1.sinks.k1.type  =  hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flux/reportTime=%Y-%m-%d
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k2.type  =  org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic  =  flux
a1.sinks.k2.kafka.bootstrap.servers  =  hadoop01:9092
#描述内存Channel
a1.channels.c1.type  =  memory
a1.channels.c1.capacity  =  1000
a1.channels.c1.transactionCapacity  =  100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
#为Channle绑定Source和Sink
a1.sources.r1.channels  =  c1 c2
a1.sinks.k1.channel  =  c1
a1.sinks.k2.channel  =  c2

注意:添加或者修改黄色的部分,缺少将无法正常运行。

启动flume

在flume的conf目录下执行:

../bin/flume-ng agent -c ./ -f ./flume.properties -n a1 -Dflume.root.logger=INFO,console &

启动jtlogserver测试,可以看到HDFS有新的文件产生,Kafka的消费者控制台有打印生产的日志。

Kafka整合flink

创建一个maven工程(java)

img

导入pom文件

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>

<artifactId>flink-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
</dependencies>

测试

package cn.tedu.logdatastream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class DataStreamTest {
    public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
       source.print();
       env.execute("DataStreamTest");
    }
}

整合kafka

Properties properties = new Properties();

properties.setProperty("bootstrap.servers", "hadoop01:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "hadoop01:2181"); //0.8以上版本不需要
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

业务逻辑实现

package cn.tedu.logdatastream;
import java.util.Properties;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import cn.tedu.pojo.UserLog;
public class FlinkDataStreamForLog {
    public static void main(String[] args) throws Exception {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       Properties props = new Properties();
       props.setProperty("bootstrap.servers", "hadoop01:9092");
       props.setProperty("group.id", "test");
       DataStream<String> source = env.addSource(new FlinkKafkaConsumer<String>("flux", new SimpleStringSchema(), props));
       source.map(new MapFunction<String, UserLog>() {
           @Override
           public UserLog map(String line) throws Exception {
              UserLog userLog = new UserLog();
              String[] values = line.split("\\|");
              userLog.setUrl(values[0]);
              userLog.setUrlname(values[1]);
              userLog.setUvid(values[13]);
              String[] ss = values[14].split("_");
              userLog.setSsid(ss[0]);
              userLog.setSscount(ss[1]);
              userLog.setSstime(ss[2]);
              userLog.setCip(values[15]);
              return userLog;
           }
       }).print().setParallelism(1);
       env.execute("FlinkDataStreamForLog");
    }
}

JDBC输出结果到MySQL

package cn.tedu.datastream;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class JDBCTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.fromElements("1|刘沛霞|12");
        source.map(x -> {
            String[] s = x.split("\\|");
            LogInfo log = new LogInfo();
            log.setId(Integer.parseInt(s[0]));
            log.setName(s[1]);
            log.setAge(Integer.parseInt(s[2]));
            return log;
        }).addSink(JdbcSink.sink("insert into stream_log values (?,?,?)",
                        (ps, x) -> {
                            ps.setInt(1,x.getId() );
                            ps.setString(2, x.getName());
                            ps.setInt(3, x.getAge());
                        }
//                new JdbcStatementBuilder<LogInfo>() {
//                    @Override
//                    public void accept(PreparedStatement ps, LogInfo l) throws SQLException {
//                        ps.setInt(1,l.getId());
//                        ps.setString(2,l.getName());
//                        ps.setInt(3,l.getAge());
//                    }
//                }
                , new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/jtlog")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("root")
                        .build()
        ));
        env.execute();
    }
}

测试

准备六台虚拟机 512M内存。hadoop1~6

配置静态IP 121~126

链接并保存Xshell

拍摄快照