Spring Boot集成Kafka
in JavaDevelop with 0 comment

Spring Boot集成Kafka

in JavaDevelop with 0 comment

介绍

项目代码下载

可以参照源码进行学习:

wget https://labfile.oss.aliyuncs.com/courses/3055/spring-boot-mq-kafka.zip
unzip spring-boot-mq-kafka.zip

Kafka 简介

Kafka 是一个高吞吐量的分布式发布订阅消息系统,解耦了消息生产者和消息消费者。与大数据框架 Spark 配合使用,可以实现数据的实时流处理,目前做大数据相关的项目,都会用到 Kafka 消息系统。

Kafka 特点

Kafka 是一个分布式的流处理平台。它具有以下特点:

相关术语名词

Kafka 工作原理

Kafka 设计为集群运行,从而能够实现很强的可扩展性。通过将 Topic 在集群的所有实例上划分为多个 Partition ,它能够具有更强的可扩展性。RabbitMQ 主要处理 Exchange 中的队列,而 Kafka 仅使用 Topic 实现消息的发布/订阅。

Kafka Topic 会复制到集群的多个 Broker 上。集群中的每个节点都会担任一个或多个 Topic 的首领(Leader),负责该主题的数据并将其复制到集群中的其它节点上。

更进一步来讲,每个 Topic 可以划分为多个 Partition 。在这种情况下,集群中的每个节点是某个 Topic 一个或多个分区的 Leader,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。下图阐述了它是如何运行的。

image-1655346280189

在 WebIDE 中安装 Kafka

安装 Kafka

要想运行带有 Kafka 的 Spring Boot 应用,首先就得启动 Kafka 服务,虽然实验楼的 WebIDE 环境中默认安装了 Kafka 的,但这里还是演示一下如何安装 Kafka。

运行下面命令:

cd /opt
ls

可以看到实验楼的环境中安装得有 Scala 2.11 编译的 1.0.0 版本的 Kafka。

image-1655346299497

但是先不管它,我们自己来重新安装 Kafka。

下载 Kafka 软件包到当前目录中:

wget https://labfile.oss.aliyuncs.com/courses/3055/kafka_2.13-2.6.0.tgz

之后再进行解压:

sudo tar -zxvf kafka_2.13-2.6.0.tgz

对配置文件做一些改动,运行以下命令切换到制定文件夹中:

cd ./kafka_2.13-2.6.0/config

查看该目录下的 server.properties 文件:

sudo vim server.properties

需要修改下图中红框部分信息:

image-1655346309939

更改成如下形式:

image-1655346316428

启动 Kafka

要想启动 Kafka,需要先启动 zookeeper ,因为 Kafka 集群使用 zookeeper 软件进行元数据信息的同步。在生产环境中,安装 Kafka 之前,需要先安装 zookeeper。基于实验的目的,本次实验不再单独安装 zookeeper 软件,而是使用 Kafka 自带的 zookeeper 软件,从功能上讲,没有区别。

在当前路径启动 zookeeper :

sudo ../bin/zookeeper-server-start.sh  ./zookeeper.properties

启动成功后如下图所示:

image-1655346324767

这里有个小技巧,在命令前加 nohup ,尾部加 & ,这样命令就会以 Linux 后台进程运行,关闭当前命令行终端也无影响。例如 nohup ../bin/zookeeper-server-start.sh ./zookeeper.properties &,同学们可以自行尝试,也包括下面启动 Kafka 服务。

然后重新打开一个终端。在 WebIDE 上方菜单栏选择 Terminal ,下拉后选择 New Terminal,在新的控制台启动 Kafka 服务:

sudo /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh /opt/kafka_2.13-2.6.0/config/server.properties

启动成功后如下所示:

image-1655346333303

将 Kafka 整合进 Spring Boot

新建工程

之前的 Terminal 不要关闭,重新打开一个 Terminal,在终端中输入以下内容,创建一个 Maven 工程。

mvn archetype:generate \
  -DgroupId=com.example.mq.kafka \
  -DartifactId=spring-boot-mq-kafka \
  -DarchetypeArtifactId=maven-archetype-quickstart \
  -DinteractiveMode=false

将项目结构改成如下结构:

├── pom.xml
└── src
    └── main
        ├── java
        │   └── com
        │       └── example
        │           └── mq
        │               └── kafka
        │                   ├── KafkaDemoApplication.java
        │                   ├── config
        │                   │   └── KafkaConfig.java
        │                   ├── constants
        │                   │   └── KafkaConsts.java
        │                   ├── controller
        │                   │   └── MessageProducer.java
        │                   └── handler
        │                       └── MessageHandler.java
        └── resources
            └── application.yml

添加 Kafka 到 Spring Boot 中

与 RabbitMQ 不同,Spring Boot 并没有针对 Kafka 的 starter,所以增加的依赖就不能像 RabbitMQ 一样直接引入 AMQP 的依赖。

引入 Kafka 需要添加依赖如下所示:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

最终在 pom.xml 中添加依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <groupId>com.example.mq.kafka</groupId>
  <artifactId>spring-boot-mq-kafka</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>kafka-demo</name>
  <description>kafka demo project for Spring Boot</description>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

</project>

接下来,需要配置一些属性。一般来说,KafkaTemplate 默认使用 localhost 上的监听 9092 端口的 Kafka 代理,所以需要在 application.yml 中添加如下属性:

server:
  port: 8080
  servlet:
    context-path: /example
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
      retries: 0
      #每批次发送消息的数量
      batch-size: 16384
      # producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
      buffer-memory: 33554432
      # key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: spring-boot-example
      # 手动提交
      enable-auto-commit: false
      # Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: latest
      # key的解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value的解码方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 60000
    listener:
      # 使记录在控制台显示
      log-container-config: true
      concurrency: 5
      # 手动提交
      ack-mode: manual_immediate

在上面的配置中,可以注意到 spring.kafka.bootstrap-servers 是复数形式,也就是说,我们是可以提供集群中的 Kafka 服务器的,例如:

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
      - localhost:9093
      - localhost:9094

创建常量池

com.example.mq.kafka.constants 下建立一个常量池 KafkaConsts.java ,规定 Kafka 集群的默认 Partition 数目和 Topic 的名称:

package com.example.mq.kafka.constants;

public interface KafkaConsts {
    /**
     * 默认 Partition 数目
     */
    Integer DEFAULT_PARTITION_NUM = 3;

    /**
     * Topic 名称
     */
    String TOPIC_TEST = "test";
}

消息发送控制类

和 RabbitMQ 类似,Kafka 同样是使用 KafkaTemplate 来发送消息,这里实现一个以 URL 的方式发送消息的消息发送控制类,其实也就是我们通常所说的生产者。 kafkaTemplate 类发送消息的方法如下:

ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

可以发现,与 RabbitMQ 发送消息的区别还是有一些,这里没有了 convertAndSend() 方法。这是因为, KafkaTemplate 是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样的话,所有的 send() 方法都完成了 convertAndSend() 的任务。

同时, send()sendDefault() 的参数 RabbitMQ 有很大的差异。在使用 Kafka 发送消息的时候,我们可以使用如下参数设置消息该如何进行发送:

主题和消息内容是其中最重要的两个参数。分区和 key 对于如何使用 KafkaTemplate 几乎没有影响,只是作为额外的信息提供给 send()sendDefault() 。在实验中,我们只关心将消息内容发送到给定的主题,不用担心分区和 key 的问题。

com.example.mq.kafka.controller 下新建控制类

package com.example.mq.kafka.controller;

import com.example.mq.kafka.constants.KafkaConsts;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/messageSend")
    public String Producer(String Message) {
        // 指定发送消息的 Topic 固定,只自行编写发送消息的内容
        kafkaTemplate.send(KafkaConsts.TOPIC_TEST, Message);
        return "Message sent successfully!";
    }
}

消息接收类

在我们的实验中,想要使用 @KafkaListenr 注解来作为消息的监听器,但是这种机制需要一个 @Configuration 类上的 @EnableKafka 注解和一个监听器容器工厂,该工厂用于配置底层的 ConcurrentMessageListenerContainer 。默认情况下,需要一个名为 kafkaListenerContainerFactory 的 bean。下面的示例展示了如何使用 ConcurrentMessageListenerContainer

package com.example.mq.kafka.handler;

import com.example.mq.kafka.constants.KafkaConsts;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageHandler {
    @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
    public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
        try {
            String message = (String) record.value();
            log.info("Messages are received: {}", message);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        } finally {
            // 手动提交 offset
            acknowledgment.acknowledge();
        }
    }
}

注意:要设置容器属性,必须在工厂上使用 getContainerProperties() 方法。它被用作注入到容器中的实际属性的模板。

创建配置类

KafkaTemplate 类配置

可以看到消息发送控制类,也就是常说的生产者主要是使用了 KafkaTemplate 类来进行消息发送,相应地需对其作一些配置。首先需要配置一个实例化生产者的工厂 producerFactory() ,并在 KafkaTemplate 的构造函数中提供。这种方法的实现代码如下:

  /**
  * 工厂模式实例化生产者的方法
  */
  @Bean
  public ProducerFactory<Integer, String> producerFactory() {
      return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  /**
  * 生产者的配置属性
  */
  @Bean
  public Map<String, Object> producerConfigs() {
      Map<String, Object> props = new HashMap<>();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      return props;
  }

  @Bean
  public KafkaTemplate<Integer, String> kafkaTemplate() {
      return new KafkaTemplate<Integer, String>(producerFactory());
  }

@KafkaListener 注释的方法配置

对应消息发送,消息接收同样需要一些配置。从之前的代码可以看到我们是使用了 @KafkaListener 注解来使消息接收类接收消息。配置方式如下所示:

  /**
    * 主要用于为 @KafkaListener 注释的方法构建容器
    */
  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
      // 实例化一个容器
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
      // 指定要使用的工厂
      factory.setConsumerFactory(consumerFactory());
      // 指定容器并发数
      factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
      // 如果要在该端点设置监听器,设置为true
      factory.setBatchListener(true);
      // 配置容器,设置消费者等待记录的块的最大时间。
      factory.getContainerProperties().setPollTimeout(3000);
      return factory;
  }

  /**
    * 实例化消费者的方法
    */
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
      // 返回一个使用提供的配置构建的工厂
      return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
  }


  /**
    * 使用ACK机制确认的消费
    */
  @Bean("ackContainerFactory")
  public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
      factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
      return factory;
  }

配置类编写

结合上面的消息,在 com.example.mq.kafka.config 下新建 KafkaConfig.java 文件,添加如下内容:

package com.example.mq.kafka.config;

import com.example.mq.kafka.constants.KafkaConsts;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;

@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {

    private final KafkaProperties kafkaProperties;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    @Bean("ackContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
        return factory;
    }
}

在我们的实验代码中, 可以发现和前面讲解例子相比,少了一个属性配置方法,使用了 @EnableConfigurationProperties({KafkaProperties.class}) 注解,加载了在 application.yml 文件中配置的生产者和消费者的属性。

修改消息启动类

com.example.mq.kafka 下修改之前的 App.javaKafkaDemoApplication.java ,并添加以下内容:

package com.example.mq.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }

}

实验测试

输入一下命令测试程序:

cd spring-boot-mq-kafka
mvn spring-boot:run

之后,在出现的 URL 后面添加 /example/messageSend?Message= kafka, 敲下回车发送消息,成功后会出现如下提示:

image-1655346364683

运行成功后可以找到如下信息,证明 Kafka 监听器获取到了发送的消息:

image-1655346357520