RabbitMQ
下载软件
RabbitMQ是Erlang语言开发的,所以要先安装Erlang语言的运行环境
下载Erlang的官方路径
https://erlang.org/download/otp_versions_tree.html
安装的话就是双击
不要安装在中文路径和有空格的路径下!!!
下载RabbitMQ
https://www.rabbitmq.com/install-windows.html
安装也是双击即可
不要安装在中文路径和有空格的路径下!!!
RabbitMQ的结构
和Kafka不同,Kafka是使用话题名称来收发信息,结构简单
RabbitMQ是使用交换机\路由key指定要发送消息的队列
消息的发送者发送消息时,需要指定交换机和路由key名称
消息的接收方接收消息时,只需要指定队列的名称
在编写代码上,相比于Kafka,每个业务要编写一个配置类
这个配置类中要绑定交换机和路由key的关系,以及路由Key和队列的关系
配置Erlang的环境变量
要想运行RabbitMQ必须保证系统有Erlang的环境变量
配置Erlang环境变量
把安装Erlang的bin目录配置在环境变量Path的属性中
启动RabbitMQ
找到RabbitMQ的安装目录
可能是:
G:\pgm\rabbit\rabbitmq_server-3.10.1\sbin
具体路径根据自己的情况寻找
地址栏运行cmd
输入启动指令如下
G:\pgm\rabbit\rabbitmq_server-3.10.1\sbin>rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins enable rabbitmq_management
结果如下
运行完成后
可以在Window任务管理器中的服务选项卡里找到RabbitMQ的服务(Ctrl+Shift+ESC)
另外的验证方法:
打开浏览器访问http://localhost:15672
登录界面用户名密码
guest
guest
登录成功后看到RabbitMQ运行的状态
如果启动失败,需要重新安装
参考路径如下
https://baijiahao.baidu.com/s?id=1720472084636520996&wfr=spider&for=pc
利用RabbitMQ完成消息的收发
csmall-stock-webapi项目中测试RabbitMQ
可以利用之前我们使用Quartz实现的每隔一段时间输出当前日期信息的方法改为发送消息
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml文件配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
交换机\路由Key\队列的配置类
RabbitMQ要求我们再java代码级别设置交换机\路由Key\队列的关系
我们再quartz包下,创建config包
包中创建配置信息类
// SpringBoot整合RabbitMQ之后
// 这些配置信息要保存在Spring容器中,所以这些配置也要交给SpringBoot管理
@Configuration
public class RabbitMQConfig {
// 声明需要使用的交换机\路由Key\队列的名称
public static final String STOCK_EX="stock_ex";
public static final String STOCK_ROUT="stock_rout";
public static final String STOCK_QUEUE="stock_queue";
// 声明交换机,需要几个声明几个,这里就一个
// 方法中实例化交换机对象,确定名称,保存到Spring容器
@Bean
public DirectExchange stockDirectExchange(){
return new DirectExchange(STOCK_EX);
}
// 声明队列,需要几个声明几个,这里就一个
// 方法中实例化队列对象,确定名称,保存到Spring容器
@Bean
public Queue stockQueue(){
return new Queue(STOCK_QUEUE);
}
// 声明路由Key(交换机和队列的关系),需要几个声明几个,这里就一个
// 方法中实例化路由Key对象,确定名称,保存到Spring容器
@Bean
public Binding stockBinding(){
return BindingBuilder.bind(stockQueue()).to(stockDirectExchange())
.with(STOCK_ROUT);
}
}
RabbitMQ发送消息
我们再QuartzJob类中输出时间的代码后继续编写代码
实现RabbitMQ消息的发送
public class QuartzJob implements Job {
// RabbitTemplate就是amqp框架提供的发送消息的对象
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//输出当前时间
System.out.println("--------------"+ LocalDateTime.now() +"---------------");
// 先简单的发送一个字符串
rabbitTemplate.convertAndSend(RabbitMQConfig.STOCK_EX,
RabbitMQConfig.STOCK_ROUT,"接收到减少库存的消息");
}
}
我们可以通过修改QuartzConfig类中的Cron表达式修改调用的周期
CronScheduleBuilder cronScheduleBuilder=
CronScheduleBuilder.cronSchedule("0/10 * * * * ?");
接收RabbitMQ的消息
quartz包下再创建一个新的类用于接收信息
RabbitMQConsumer代码如下
// 这个对象也是需要交由Spring容器管理的,才能实现监听Spring容器中保存的队列的效果
@Component
// 和Kafka不同的是Kafka在一个方法上声明监听器
// 而RabbitMQ是在类上声明,监听具体的队列名称
@RabbitListener(queues = {RabbitMQConfig.STOCK_QUEUE})
public class RabbitMQConsumer {
// 监听了类,但是运行代码的一定是个方法
// 框架要求这个类中只允许一个方法包含下面这个注解
// 表示这个方法是处理消息的方法
// 方法的参数就是消息的值
@RabbitHandler
public void process(String str){
System.out.println("接收到的消息为:"+str);
}
}
启动Nacos\RabbitMQ\Seata
启动stock-webapi
根据Cron表达式,消息会在0/10/20/30/40/50秒数时运行
测试成功表示一切正常
本文由 liyunfei 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jul 11,2022