消息队列
软件下载
Dubbo远程调用的性能问题
Dubbo调用在微服务项目中普遍存在
这些Dubbo调用都是同步的
"同步"指:A(消费者)调用B(生产者)的服务A在发起调用后,在B返回之前只能等待
直到B返回结果后A才能运行
Dubbo消费者发送调用后进入阻塞状态,这个状态表示改线程仍占用内存资源,但是什么动作都不做
如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率
实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情
这种情况下我们就要使用消息队列
什么是消息队列
消息队列(Message Queue)简称MQ
消息队列是采用"异步(两个微服务项目并不需要同时完成请求)"的方式来传递数据完成业务操作流程的业务处理方式
消息队列的特征
常见面试题:消息队列的特征
- 利用异步的特性,提高服务器的运行效率,减少因为远程调用出现的线程等待\阻塞
- 削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定
- 消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接收这种延迟,就不要使用消息队列
常见消息队列
- Kafka:性能好\功能弱:适合大数据量,高并发的情况,大数据领域使用较多
- RabbitMQ:功能强\性能一般:适合发送需求复杂的消息队列,java业务中使用较多
- RocketMQ:阿里的
- ActiveMQ:前几年流行的,老项目可能用到
常见面试题:消息队列异常处理
如果我们真的将上面生成订单业务里,减少库存的操作从正常流程中剥离到消息队列
那么如果库存减少过程中发送异常,就不能由Seata接收了,因为异步的处理无法和Seata通信
意思是如果使用了消息队列,队列中处理数据过程发送异常,那么就要用特殊的方法处理问题
处理方式就是手写代码进行回滚,一般情况就是在stock,模块再向order模块发送消息,order模块接收到消息后进行进一步处理
如果order模块进一步处理时又发生异常,我们可以再向一个实现设置好的消息队列中发送消息
这个消息队列没有处理者,我们称之为"死信队列",一个正常运行的程序,会定期有人处理死信队列信息
Kafka
什么是Kafka
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。
kafka软件结构
Kafka Cluster(Kafka集群)
Partition(分片)
Producer:消息的发送方,也就是消息的来源,Kafka中的生产者
order就是消息的发送方
Consumer:消息的接收方,也是消息的目标,Kafka中的消费者
stock就是消息的接收方法
Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人
Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中
Kafka的特征与优势
Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大
Kafka将消息队列中的信息保存在硬盘中
Kafka对硬盘的读取规则进行优化后,效率能够接近内存
硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"
Kafka处理队列中数据的默认设置:
-
Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)
-
Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗
Kafka的安装和配置
最好将我们kafka软件的解压位置设置在一个根目录
然后路径不要有空格和中文
我们要创建一个空目录用于保存Kafka运行过程中产生的数据
本次创建名称为data的空目录
下面开始配置启动信息
先到G:\kafka\config下配置有文件zookeeper.properties
找到dataDir属性修改如下
dataDir=G:/data
注意G盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称
还要修改server.properties配置文件
log.dirs=G:/data
修改注意事项和上面相同
Zookeeper简介
我们在启动kafka前必须先启动Zookeeper
zoo:动物园
keeper:园长
可以引申为管理动物的人
早期,每个服务器系统中的软件在安装后运行都需要一些配置
那么软件多了,配置会比较复杂
我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统
在Zookeeper中,可以修改服务器系统中的所有软件配置
长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取
Kafka就是需要将配置编写在Zookeeper中的软件之一
Kafka的启动
启动Zookeeper
进入路径G:\kafka\bin\windows
输入cmd进入dos命令行
G:\kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties
启动kafka
总体方式一样,输入不同指令
G:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties
附录
Mac系统启动Kafka服务命令(参考):
# 进入Kafka文件夹
cd Documents/kafka_2.13-2.4.1/bin/
# 动Zookeeper服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
# 启动Kafka服务
./kafka-server-start.sh -daemon ../config/server.properties
Mac系统关闭Kafka服务命令(参考):
# 关闭Kafka服务
./kafka-server-stop.sh
# 启动Zookeeper服务
./zookeeper-server-stop.sh
在启动kafka时有一个常见错误
wmic不是内部或外部命令
这样的提示,需要安装wmic命令,安装方式参考
https://zhidao.baidu.com/question/295061710.html
Kafka使用Demo
不要关闭Zookeeper和Kafka的dos窗口
我们再csmall项目中编写一个简单的Demo学习Kafka的使用
在csmall-cart-webapi模块中
添加依赖
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
修改yml文件配置
spring:
kafka:
# 定义kafka的位置
bootstrap-servers: localhost:9092
# 话题的分组名称,是必须配置的
# 为了区分当前项目和其他项目使用的,防止了不同项目相同话题的干扰或错乱
# 本质是在话题名称前添加项目名称为前缀来防止的
consumer:
group-id: csmall
SpringBoot启动类中添加注解
@SpringBootApplication
@EnableDubbo
// 启动kafka的功能
@EnableKafka
// 为了测试kafka,我们可以周期性的发送消息到消息队列
// 使用SpringBoot自带的调度工具即可
@EnableScheduling
public class CsmallCartWebapiApplication {
public static void main(String[] args) {
SpringApplication.run(CsmallCartWebapiApplication.class, args);
}
}
下面我们就可以实现周期性的向kafka发送消息并接收的操作了
编写消息的发送
cart-webapi包下创建kafka包
包中创建Producer类来发送消息
// 我们需要周期性的向Kafka发送消息
// 需要将具备SpringBoot调度功能的类保存到Spring容器才行
@Component
public class Producer {
// 能够实现将消息发送到Kafka的对象
// 只要Kafka配置正确,这个对象会自动保存到Spring容器中,我们直接装配即可
// KafkaTemplate<[话题名称的类型],[传递消息的类型]>
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
// 每隔10秒向Kafka发送信息
int i=1;
// fixedRate是周期运行,单位毫秒 10000ms就是10秒
@Scheduled(fixedRate = 10000)
// 这个方法只要启动SpringBoot项目就会按上面设置的时间运行
public void sendMessage(){
// 实例化一个Cart类型对象,用于发送消息
Cart cart=new Cart();
cart.setId(i++);
cart.setCommodityCode("PC100");
cart.setPrice(RandomUtils.nextInt(100)+200);
cart.setCount(RandomUtils.nextInt(5)+1);
cart.setUserId("UU100");
// 将cart对象转换为json格式字符串
Gson gson=new Gson();
// 执行转换
String json=gson.toJson(cart);
System.out.println("本次发送的消息为:"+json);
// 执行发送
// send([话题名称],[发送的消息]),需要遵循上面kafkaTemplate声明的泛型类型
kafkaTemplate.send("myCart",json);
}
}
创建一个叫Consumer的类来接收消息
// 因为Kafka接收消息是自动的,所以这个类也必须交由Spring容器管理0
@Component
public class Consumer {
// SpringKafka框架接收Kafka中的消息使用监听机制
// SpringKafka框架提供一个监听器,专门负责关注指定的话题名称
// 只要该话题名称中有消息,会自动获取该消息,并调用下面方法
@KafkaListener(topics = "myCart")
// 上面注解和下面方法关联,方法的参数就是接收到的消息
public void received(ConsumerRecord<String,String> record){
// 方法参数类型必须是ConsumerRecord
// ConsumerRecord<[话题名称类型],[消息类型]>
// 获取消息内容
String json=record.value();
// 要想在java中使用,需要转换为java对象
Gson gson=new Gson();
// 将json转换为java对象,需要提供转换目标类型的反射
Cart cart=gson.fromJson(json,Cart.class);
System.out.println("接收到对象为:"+cart);
}
}
本文由 liyunfei 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Oct 4,2022