介绍
项目代码下载
可以参照源码进行学习:
wget https://labfile.oss.aliyuncs.com/courses/3055/spring-boot-mq-kafka.zip
unzip spring-boot-mq-kafka.zip
Kafka 简介
Kafka 是一个高吞吐量的分布式发布订阅消息系统,解耦了消息生产者和消息消费者。与大数据框架 Spark 配合使用,可以实现数据的实时流处理,目前做大数据相关的项目,都会用到 Kafka 消息系统。
Kafka 特点
Kafka 是一个分布式的流处理平台。它具有以下特点:
- 支持消息的发布和订阅,类似于 RabbitMQ、ActiveMQ 等消息队列。
- 支持数据实时处理。
- 能保证消息的可靠性投递。
- 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错。
- 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量。
相关术语名词
-
Broker
Kafka 集群包含一个或多个服务器,这种服务器被称为 Broker 。
-
Topic
发布到 Kafka 集群上的消息都属于某一个分类,而这个分类,也就被称为 Topic。Topic 存储在 Broker 中。
-
Partition
Partition 是一个有序的、不可变的消息序列,每个 Topic 可以划分为多个 Partition,而每一个 Partition 可以有多个 Replica(副本),一条消息只能发送到一个 Partition 上。
-
Producer
消息生产者,负责发布消息到 Broker 的 Topic 上
-
Consumer
消息消费者,从 Broker 的 Topic 读取消息
Kafka 工作原理
Kafka 设计为集群运行,从而能够实现很强的可扩展性。通过将 Topic 在集群的所有实例上划分为多个 Partition ,它能够具有更强的可扩展性。RabbitMQ 主要处理 Exchange 中的队列,而 Kafka 仅使用 Topic 实现消息的发布/订阅。
Kafka Topic 会复制到集群的多个 Broker 上。集群中的每个节点都会担任一个或多个 Topic 的首领(Leader),负责该主题的数据并将其复制到集群中的其它节点上。
更进一步来讲,每个 Topic 可以划分为多个 Partition 。在这种情况下,集群中的每个节点是某个 Topic 一个或多个分区的 Leader,但并不是整个主题的首领。主题的责任会在所有节点间进行拆分。下图阐述了它是如何运行的。
在 WebIDE 中安装 Kafka
安装 Kafka
要想运行带有 Kafka 的 Spring Boot 应用,首先就得启动 Kafka 服务,虽然实验楼的 WebIDE 环境中默认安装了 Kafka 的,但这里还是演示一下如何安装 Kafka。
运行下面命令:
cd /opt
ls
可以看到实验楼的环境中安装得有 Scala 2.11 编译的 1.0.0 版本的 Kafka。
但是先不管它,我们自己来重新安装 Kafka。
下载 Kafka 软件包到当前目录中:
wget https://labfile.oss.aliyuncs.com/courses/3055/kafka_2.13-2.6.0.tgz
之后再进行解压:
sudo tar -zxvf kafka_2.13-2.6.0.tgz
对配置文件做一些改动,运行以下命令切换到制定文件夹中:
cd ./kafka_2.13-2.6.0/config
查看该目录下的 server.properties
文件:
sudo vim server.properties
需要修改下图中红框部分信息:
更改成如下形式:
启动 Kafka
要想启动 Kafka,需要先启动 zookeeper ,因为 Kafka 集群使用 zookeeper 软件进行元数据信息的同步。在生产环境中,安装 Kafka 之前,需要先安装 zookeeper。基于实验的目的,本次实验不再单独安装 zookeeper 软件,而是使用 Kafka 自带的 zookeeper 软件,从功能上讲,没有区别。
在当前路径启动 zookeeper :
sudo ../bin/zookeeper-server-start.sh ./zookeeper.properties
启动成功后如下图所示:
这里有个小技巧,在命令前加
nohup
,尾部加&
,这样命令就会以 Linux 后台进程运行,关闭当前命令行终端也无影响。例如nohup ../bin/zookeeper-server-start.sh ./zookeeper.properties &
,同学们可以自行尝试,也包括下面启动 Kafka 服务。
然后重新打开一个终端。在 WebIDE 上方菜单栏选择 Terminal ,下拉后选择 New Terminal,在新的控制台启动 Kafka 服务:
sudo /opt/kafka_2.13-2.6.0/bin/kafka-server-start.sh /opt/kafka_2.13-2.6.0/config/server.properties
启动成功后如下所示:
将 Kafka 整合进 Spring Boot
新建工程
之前的 Terminal 不要关闭,重新打开一个 Terminal,在终端中输入以下内容,创建一个 Maven 工程。
mvn archetype:generate \
-DgroupId=com.example.mq.kafka \
-DartifactId=spring-boot-mq-kafka \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
将项目结构改成如下结构:
├── pom.xml
└── src
└── main
├── java
│ └── com
│ └── example
│ └── mq
│ └── kafka
│ ├── KafkaDemoApplication.java
│ ├── config
│ │ └── KafkaConfig.java
│ ├── constants
│ │ └── KafkaConsts.java
│ ├── controller
│ │ └── MessageProducer.java
│ └── handler
│ └── MessageHandler.java
└── resources
└── application.yml
添加 Kafka 到 Spring Boot 中
与 RabbitMQ 不同,Spring Boot 并没有针对 Kafka 的 starter,所以增加的依赖就不能像 RabbitMQ 一样直接引入 AMQP 的依赖。
引入 Kafka 需要添加依赖如下所示:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
最终在 pom.xml
中添加依赖如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example.mq.kafka</groupId>
<artifactId>spring-boot-mq-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-demo</name>
<description>kafka demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
接下来,需要配置一些属性。一般来说,KafkaTemplate 默认使用 localhost 上的监听 9092 端口的 Kafka 代理,所以需要在 application.yml
中添加如下属性:
server:
port: 8080
servlet:
context-path: /example
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retries: 0
#每批次发送消息的数量
batch-size: 16384
# producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
buffer-memory: 33554432
# key序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: spring-boot-example
# 手动提交
enable-auto-commit: false
# Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
auto-offset-reset: latest
# key的解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value的解码方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session.timeout.ms: 60000
listener:
# 使记录在控制台显示
log-container-config: true
concurrency: 5
# 手动提交
ack-mode: manual_immediate
在上面的配置中,可以注意到 spring.kafka.bootstrap-servers 是复数形式,也就是说,我们是可以提供集群中的 Kafka 服务器的,例如:
spring:
kafka:
bootstrap-servers:
- localhost:9092
- localhost:9093
- localhost:9094
创建常量池
在 com.example.mq.kafka.constants
下建立一个常量池 KafkaConsts.java
,规定 Kafka 集群的默认 Partition
数目和 Topic
的名称:
package com.example.mq.kafka.constants;
public interface KafkaConsts {
/**
* 默认 Partition 数目
*/
Integer DEFAULT_PARTITION_NUM = 3;
/**
* Topic 名称
*/
String TOPIC_TEST = "test";
}
消息发送控制类
和 RabbitMQ 类似,Kafka 同样是使用 KafkaTemplate
来发送消息,这里实现一个以 URL 的方式发送消息的消息发送控制类,其实也就是我们通常所说的生产者。 kafkaTemplate
类发送消息的方法如下:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
可以发现,与 RabbitMQ 发送消息的区别还是有一些,这里没有了 convertAndSend()
方法。这是因为, KafkaTemplate 是通过泛型类型化的,在发送消息的时候,它能够直接处理领域类型。这样的话,所有的 send()
方法都完成了 convertAndSend()
的任务。
同时, send()
和 sendDefault()
的参数 RabbitMQ 有很大的差异。在使用 Kafka 发送消息的时候,我们可以使用如下参数设置消息该如何进行发送:
- 消息要发送到的主题(send()方法的必选参数)
- 主题要写入的分区(可选)
- 记录上要发送的 key(可选)
- 时间戳(可选,默认为 System.currentTimeMillis())
- 消息内容(必选)
主题和消息内容是其中最重要的两个参数。分区和 key 对于如何使用 KafkaTemplate 几乎没有影响,只是作为额外的信息提供给 send()
和 sendDefault()
。在实验中,我们只关心将消息内容发送到给定的主题,不用担心分区和 key 的问题。
在 com.example.mq.kafka.controller
下新建控制类
package com.example.mq.kafka.controller;
import com.example.mq.kafka.constants.KafkaConsts;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/messageSend")
public String Producer(String Message) {
// 指定发送消息的 Topic 固定,只自行编写发送消息的内容
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, Message);
return "Message sent successfully!";
}
}
消息接收类
在我们的实验中,想要使用 @KafkaListenr
注解来作为消息的监听器,但是这种机制需要一个 @Configuration
类上的 @EnableKafka
注解和一个监听器容器工厂,该工厂用于配置底层的 ConcurrentMessageListenerContainer
。默认情况下,需要一个名为 kafkaListenerContainerFactory
的 bean。下面的示例展示了如何使用 ConcurrentMessageListenerContainer
。
package com.example.mq.kafka.handler;
import com.example.mq.kafka.constants.KafkaConsts;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MessageHandler {
@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "ackContainerFactory")
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("Messages are received: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
}
注意:要设置容器属性,必须在工厂上使用
getContainerProperties()
方法。它被用作注入到容器中的实际属性的模板。
创建配置类
KafkaTemplate 类配置
可以看到消息发送控制类,也就是常说的生产者主要是使用了 KafkaTemplate
类来进行消息发送,相应地需对其作一些配置。首先需要配置一个实例化生产者的工厂 producerFactory()
,并在 KafkaTemplate
的构造函数中提供。这种方法的实现代码如下:
/**
* 工厂模式实例化生产者的方法
*/
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生产者的配置属性
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
@KafkaListener 注释的方法配置
对应消息发送,消息接收同样需要一些配置。从之前的代码可以看到我们是使用了 @KafkaListener
注解来使消息接收类接收消息。配置方式如下所示:
/**
* 主要用于为 @KafkaListener 注释的方法构建容器
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
// 实例化一个容器
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 指定要使用的工厂
factory.setConsumerFactory(consumerFactory());
// 指定容器并发数
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
// 如果要在该端点设置监听器,设置为true
factory.setBatchListener(true);
// 配置容器,设置消费者等待记录的块的最大时间。
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
/**
* 实例化消费者的方法
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// 返回一个使用提供的配置构建的工厂
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
/**
* 使用ACK机制确认的消费
*/
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
return factory;
}
配置类编写
结合上面的消息,在 com.example.mq.kafka.config
下新建 KafkaConfig.java
文件,添加如下内容:
package com.example.mq.kafka.config;
import com.example.mq.kafka.constants.KafkaConsts;
import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ContainerProperties;
@Configuration
@EnableConfigurationProperties({KafkaProperties.class})
@EnableKafka
@AllArgsConstructor
public class KafkaConfig {
private final KafkaProperties kafkaProperties;
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
return factory;
}
}
在我们的实验代码中, 可以发现和前面讲解例子相比,少了一个属性配置方法,使用了 @EnableConfigurationProperties({KafkaProperties.class})
注解,加载了在 application.yml
文件中配置的生产者和消费者的属性。
修改消息启动类
在 com.example.mq.kafka
下修改之前的 App.java
为 KafkaDemoApplication.java
,并添加以下内容:
package com.example.mq.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
实验测试
输入一下命令测试程序:
cd spring-boot-mq-kafka
mvn spring-boot:run
之后,在出现的 URL 后面添加 /example/messageSend?Message= kafka
, 敲下回车发送消息,成功后会出现如下提示:
运行成功后可以找到如下信息,证明 Kafka 监听器获取到了发送的消息:
本文由 liyunfei 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jun 29,2022