Spark对接Kafka
in Spark with 0 comment

Spark对接Kafka

in Spark with 0 comment

技术介绍及其在项目中的运用

SparkStreaming 简介

SparkStreaming 是基于 Spark 核心 API 的拓展,用于实时数据流的处理,具有低延迟、高吞吐和基于内存计算等特点。通过设置的批处理时间间隔生成的一批 RDD 来生成 Dstream,用于进行数据的处理。项目中使用 Kafka 当做 SparkStreaming 的数据来源。

image-1655723527087

Spark-shell 简介

Spark-shell 可以直接通过命令窗口学习 API,使用 Scala 或者 Python 语言编写数据处理分析逻辑代码进行交互式学习。实际是通过后台调用 Spark-submit 脚本来提交应用程序。项目中主要使用 Scala 语言在 Spark-shell 中进行逻辑代码编写。

预期成果

在 Sparkstreming 程序运行窗口中,对从 kafka 的 demo 主题中获取到的数据按 3 秒一次的刷新时间进行数据展示。

image-1655723533683

环境说明

本系统使用 Scala 版本为 2.11、Spark 版本为 2.4.4(Scala2.11 编写版)、Kafka 1.0.0。

通过查看提供的虚拟环境,各组件版本都符合要求。Kafka 可以使用高版本,但是不可以使用低版本,因为 Spark 2.4.4 与 Kafka 的连接组件,不支持 Kafka 1.0.0 以下版本。其它组件如果使用过高或者过低的版本极有可能出现不兼容的问题,所以尽量保持一致。

SparkStreaming 程序编写

Kafka 启动

需要先将 Zookeeper 与 Kafka 启动,因为 SparkStreaming 需要对 Kafka 进行对接。Flume 可以等 SparkStreaming 程序启动后再开始进行数据采集。参考上次实验,启动命令如下:

$ zkServer.sh start
$ kafka-server-start.sh /opt/kafka_2.11-1.0.0/config/server.properties

修改 Spark-shell 启动默认配置

由于项目中的数据来源为 Kafka,需要对获取的数据进行序列化。序列化方式是通过配置 SparkConf 对象来实现的,而 Spark-shell 在启动时已经默认创建好 sc 对象,所以无法通过传递 SparkConf 对象构建 sc 对象。

因此需要修改 Spark 文件夹中 conf 目录下的 spark-defaults.conf 文件,配置启动时默认使用的序列化方式。因为 conf 目录下只有 spark-defaults.conf.template 文件,所以需要复制模板文件再进行修改。

复制配置模板,创建 spark-default.conf 文件。

cp /opt/spark-2.4.4-bin-hadoop2.7/conf/spark-defaults.conf.template /opt/spark-2.4.4-bin-hadoop2.7/conf/spark-defaults.conf

修改 spark-default.conf 文件。

vim /opt/spark-2.4.4-bin-hadoop2.7/conf/spark-defaults.conf

将 spark-default.conf 文件第 25 行中的注释符号去除,即可设置启动时默认使用 Kryo 的序列化方式。Kryo 具有序列化速度快、易编码和易用性等优点。

image-1655723544915

引入连接 Kafka 所需的 jar 包

下载 spark-streaming-kafka-0-10_2.11-2.4.4.jar 和 kafka-clients-2.0.0.jar 两个 jar 包,放到 /opt/spark-2.4.4-bin-hadoop2.7/jars 目录下,用于连接和操作 Kafka。

cd /home/shiyanlou/Desktop

# 从课程资源中将 spark-streaming-kafka 拉到桌面上
wget https://labfile.oss.aliyuncs.com/courses/2629/spark-streaming-kafka-0-10_2.11-2.4.4.jar

# 从课程资源中将 kafka-clients 拉到桌面上
wget https://labfile.oss.aliyuncs.com/courses/2629/kafka-clients-2.0.0.jar

# 移动目录
cp spark-streaming-kafka-0-10_2.11-2.4.4.jar kafka-clients-2.0.0.jar /opt/spark-2.4.4-bin-hadoop2.7/jars

通过 Spark-shell 创建 SparkStreaming 程序

启动 Spark-shell,下述操作都在此终端完成即可。

image-1655723553095

创建用户日志信息样例类,代码如下所示,学员们自己编写代码时可以省去注释部分。

case class UserBehavior(
                         user_id: String,  //买家 id
                         item_id: String,  //商品 id
                         cat_id: String,  //商品类别 id
                         merchant_id: String,  //卖家 id
                         brand_id: String,  //品牌 id
                         month: String,  //月份
                         day: String,  //天数
                         act: String,  //行为,取值范围 {0,1,2,3},0 表示点击,1 表示加入购物车,2 表示购买,3 表示关注商品
                         age_range: String, //买家年龄分段:1 表示年龄 <18,2 表示年龄在[18,24],3 表示年龄在 [25,29],4 表示年龄在 [30,34],5 表示年龄在 [35,39],6 表示年龄在 [40,49],7 和 8 表示年龄 >=50,0 和 NULL 则表示未知
                         gender: String, //性别:0 表示女性,1 表示男性,2 和 NULL 表示未知
                         province: String //收货地址省份
                       )

Spark-shell 中用户日志信息类创建成功效果如下图所示。

注意:在 Spark-shell 中进行多行输入时候尽量使用 paste 命令,以便于代码块整体运行。使用方法为输入 :paste 开启粘贴模式,可以选择粘贴或者手动编写代码块,完成后按 Crtl+D 退出粘贴模式,代码块自动运行。

image-1655723563669

导入程序中所要使用的拓展类包。

//Java 工具配置类
import java.util.Properties
//Spark 广播类(在数据处理二中需用到)
import org.apache.spark.broadcast.Broadcast
//SparkContext 类
import org.apache.spark.SparkContext
//连接 Kafka 所需的序列化类
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
//log4j
import org.apache.log4j.{Level, Logger}
//SparkConf
import org.apache.spark.SparkConf
//Kafka 主题订阅
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//kafka 工具类(0.8 和 0.10 版本不同,此处为 0.10 版本的工具类导入)
import org.apache.spark.streaming.kafka010.KafkaUtils
//分区方式
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//StreamingContext
import org.apache.spark.streaming.{Seconds, StreamingContext}

Spark-shell 中导入成功后每行下方都会出现导入成功提示,学员自己操作时可省略注释。效果如下图所示。

image-1655723573254

创建 StreamingContext 对象,我们可以把检查点 checkpoint 设立在本地文件夹中,也可设立在 Hadoop 集群中。

先打开一个终端创建用于存放检查点的本地文件夹,并配置好权限。

递归创建检查点文件夹。

sudo mkdir -p /bigdata/spark/checkpoint

image-1655723581623

配置检查点文件夹及父目录的权限,用于程序中检查点的数据存放。

sudo chmod -R 777 /bigdata

image-1655723588656

通过使用详细信息查询命令,可以看到相关目录权限已经改变。当然,直接设置权限为 777 在实际开发中是不规范的,此处只是为了项目中创建目录方便。

如果出现下图的创建错误提示信息,说明层级目录中某些目录权限不够。

image-1655723597327

创建 StreamingContext 对象代码。

val ssc = new StreamingContext(sc,Seconds(3))
    //设置数据检查点
    //需要赋予 bigdata 以下所有目录权限,否则会因权限不足无法创建 checkpoint 文件夹
    ssc.checkpoint("/bigdata/spark/checkpoint")

    //kafka 中创建 demo 主题,用于存放 flume 采集发送过来的源数据
    val topics = Set("demo")

    val kafkaParams = Map[String, Object](
       //配置 kafka 节点
      "bootstrap.servers" -> "localhost:9092",
       //设置 key 的序列化方式为 StringDeserializer
      "key.deserializer" -> classOf[StringDeserializer],
       ////设置 value 的序列化方式为 StringDeserializer
      "value.deserializer" -> classOf[StringDeserializer],
       //设置消费组
       //一个消费组中可以有多个消费者,所订阅的主题中的消息会被多个消费者分享
       //所以在运行此程序前,该消费组中不可有其他的消费者在进行消费
       //避免造成消费数据不完整
      "group.id" -> "g1",
       //偏移量设置为最新
      "auto.offset.reset" -> "latest",
       //关闭任务自动提交
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

Spark-shell 中创建 StreamingContext 对象成功后,代码块下面会显示相关信息,包括 StreamingContext 对象信息,连接的 Kafka 主题信息,Kafka 中各个参数的详细信息,效果如下图所示。

image-1655723606719

创建 Kafka 数据源的 Dstream。

目前,创建连接 Kafka 数据源的 Dstream 有两种方式,分别为 createDstream 和 createDirectDstream,一般来说第二种常用于开发中,两者的区别在各类文章中都有提及,学员们可自行学习。

由于此项目中 SparkStreaming 与 Kafka 的集成使用了 0.10 版本,而在 0.8 之后的版本,第一种创建 Dstream 的方式已经被弃用了,所以只能使用第二种 direct 创建模式。

//以 direct 模式创建 kafka 数据源的 Dstream
val data = KafkaUtils.createDirectStream(
  //StreamingContext 对象
  ssc,
  //采用一致性方式,将分区大致均匀分配到所有 Executor 上去
  PreferConsistent,
  //配置 kafka 参数,订阅对应的 demo 主题
  Subscribe[String, String](topics, kafkaParams))

Spark-shell 中创建 Dstream 后,会出现 DirectKafkaInputstream 的提示消息,说明已经创建好与 Kafka 的连接,同时会出现一些警告,这个可以先不用处理,不会影响后期项目运行,效果如下图所示。

image-1655723614523

数据预处理,需要先将数据流进行简单的 map,将流中对象的值都取出。之后需要将原数据中某些字段进行判断赋值,以便于后期数据处理,如性别和年龄段字段,需要提前去除空值,以防出现异常。同时将得到的数据都封装为一个个的用户日志信息类的对象,形成一个个有机的整体,方便取值。

val result = data.map(_.value()).map(
      line=>{
        val record=line.split(",")
          //年龄段字段为 0 或者 null,统一重新赋值为 9,方便过滤
        if(record(8).equals("0")||record(8)==null)record(8)="9"
          //性别字段为 2 或者 null,统一重新赋值为 3,方便过滤
        if(record(9).equals("2")||record(9)==null)record(9)="3"
        UserBehavior(
          record(0), //买家 id
          record(1), //商品 id
          record(2), //商品类别 id
          record(3), //卖家 id
          record(4), //品牌 id
          record(5), //月份
          record(6), //日数
          record(7), //行为
          record(8), //年龄段
          record(9), //性别
          record(10) //省份
        )
      })

Spark-shell 中数据预处理完成后,信息提示已经转化为 UserBehavior 的 Dstream,效果如下图所示。

image-1655723626597

对以上获取到的所有用户日志信息在命令窗口中进行打印。

val test=result.print()

image-1655723651064

启动 SparkStreaming 程序。

ssc.start()

Spark-shell 中 SparkStreaming 程序启动后,每隔三秒便会在命令窗打印一次信息,由于当前未有数据传入 Kafka,所以一直没有内容显示,效果如下图所示。

image-1655723662832

启动 Flume 开始进行数据采集。参考上次实验,启动命令如下:

$ flume-ng agent -f /home/shiyanlou/Desktop/kafkaSink.flm -n a1 -c /opt/apache-flume1.6.0-bin/conf
$ flume-ng avro-client -H localhost -p 4141 -F /home/shiyanlou/Desktop/test.csv

测试结果

启动完成的 SparkStreaming 程序,每隔 3 秒从 Kafka 的 demo 主题中获取一遍数据进行消费,并实时刷新打印在命令窗口中。获取的数据都以 UserBehavior 对象的形式进行打印。

image-1655723674058

spark-shell 完整代码

下面列出在 spark-shell 中执行的完整代码,供大家参考对比:

case class UserBehavior(
                         user_id: String,  //买家 id
                         item_id: String,  //商品 id
                         cat_id: String,  //商品类别 id
                         merchant_id: String,  //卖家 id
                         brand_id: String,  //品牌 id
                         month: String,  //月份
                         day: String,  //天数
                         act: String,  //行为,取值范围 {0,1,2,3},0 表示点击,1 表示加入购物车,2 表示购买,3 表示关注商品
                         age_range: String, //买家年龄分段:1 表示年龄 <18,2 表示年龄在[18,24],3 表示年龄在 [25,29],4 表示年龄在 [30,34],5 表示年龄在 [35,39],6 表示年龄在 [40,49],7 和 8 表示年龄 >=50,0 和 NULL 则表示未知
                         gender: String, //性别:0 表示女性,1 表示男性,2 和 NULL 表示未知
                         province: String //收货地址省份
                       )

//Java 工具配置类
import java.util.Properties
//Spark 广播类(在数据处理二中需用到)
import org.apache.spark.broadcast.Broadcast
//SparkContext 类
import org.apache.spark.SparkContext
//连接 Kafka 所需的序列化类
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
//log4j
import org.apache.log4j.{Level, Logger}
//SparkConf
import org.apache.spark.SparkConf
//Kafka 主题订阅
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//kafka 工具类(0.8 和 0.10 版本不同,此处为 0.10 版本的工具类导入)
import org.apache.spark.streaming.kafka010.KafkaUtils
//分区方式
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
//StreamingContext
import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(sc,Seconds(3))
    //设置数据检查点
    //需要赋予 bigdata 以下所有目录权限,否则会因权限不足无法创建 checkpoint 文件夹
    ssc.checkpoint("/bigdata/spark/checkpoint")

    //kafka 中创建 demo 主题,用于存放 flume 采集发送过来的源数据
    val topics = Set("demo")

    val kafkaParams = Map[String, Object](
       //配置 kafka 节点
      "bootstrap.servers" -> "localhost:9092",
       //设置 key 的序列化方式为 StringDeserializer
      "key.deserializer" -> classOf[StringDeserializer],
       ////设置 value 的序列化方式为 StringDeserializer
      "value.deserializer" -> classOf[StringDeserializer],
       //设置消费组
       //一个消费组中可以有多个消费者,所订阅的主题中的消息会被多个消费者分享
       //所以在运行此程序前,该消费组中不可有其他的消费者在进行消费
       //避免造成消费数据不完整
      "group.id" -> "g1",
       //偏移量设置为最新
      "auto.offset.reset" -> "latest",
       //关闭任务自动提交
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

//以 direct 模式创建 kafka 数据源的 Dstream
val data = KafkaUtils.createDirectStream(
    //StreamingContext 对象
    ssc,
    //采用一致性方式,将分区大致均匀分配到所有 Executor 上去
    PreferConsistent,
    //配置 kafka 参数,订阅对应的 demo 主题
    Subscribe[String, String](topics, kafkaParams))

val result = data.map(_.value()).map(
      line=>{
        val record=line.split(",")
          //年龄段字段为 0 或者 null,统一重新赋值为 9,方便过滤
        if(record(8).equals("0")||record(8)==null)record(8)="9"
          //性别字段为 2 或者 null,统一重新赋值为 3,方便过滤
        if(record(9).equals("2")||record(9)==null)record(9)="3"
        UserBehavior(
          record(0), //买家 id
          record(1), //商品 id
          record(2), //商品类别 id
          record(3), //卖家 id
          record(4), //品牌 id
          record(5), //月份
          record(6), //日数
          record(7), //行为
          record(8), //年龄段
          record(9), //性别
          record(10) //省份
        )
      })

val test=result.print()

ssc.start()