RabbitMQ 使用说明
安装
在Docker 中安装 MabbitMQ.
docker run -d --name RabbieMQ -p 5672:5672 -p 15672:15672 -v rabbitmq-plugin:/plugins -e RABBITMQ_DEFAULT_USER=tzming -e RABBITMQ_DEFAULT_PASS=tzminglove rabbitmq:3.13.7-management
运行原理概述
从上图得知,RabbitMQ 的主要运行原理如下:
- 1.生产者,即消息发布者;消费者,即消息接收者
- 2.生产者通过连接 MQ 服务器,并创建一个 Channel ,通过 Channel 来发布消息,消息会到达 MQ 服务器的 ExChange ,即交换机
- 3.ExChange 是用于定义这些消息,应该以何种方式分发给消费者进行消费的,关于多种分发机制,可以在下面的章节中讲述。
- 4.ExChange 通过把消息发到绑定的 Queue 队列中,等待消费者进来取消息进行消费。
- 5.消费者通过连接 MQ 服务器,并创建一个 Channel 来接收由 MQ 服务器发来的消息,消息者收到消息并进行消费。
- 6.完成整个生产者和消费者的过程。
Work Queue 工作队列模式
工作队列模式简单的说,就是一个生产者对应多个消费者的队列模式,且消息是竞争关系,谁抢着就算谁的,消息只会被消费一次,是MQ中最简单的工作模式
通过多个消费者竞争消费同一条队列的消息,以提高处理一些工作量比较多的生产服务情况上。
在上图中,生产者 P 直接给 MQ 服务器 的条列 Queue 中发送消息,而消费者 C1 和 C2 对消息进行消费,图中没有表明使用 ExChange 交换机,但实际上这种模式也存在交换机的,只是 MQ 服务器内部预留了一个 Default ExChange 默认交换机。当我们不声明交换机时,MQ 则会默认把消息发送到 Default ExChange 交换机并创建绑定相关 Queue 队列。
Publish Subscribe 发布订阅模式
发布订阅模式与直接的工作模式更复杂一些,它有几种类型,所以这里必须要声明自定义的交换机进行消息的分发。
这里需要注意的是,ExChange 交换机并不具备存储消息的能力,当交换机没有绑定队列,或交换机消息无法匹配所绑定的队列名称时,该消息将会被交换机所丢弃。
Fanout 类型
- 广播类型,会将消息发送给所有绑定到交换机的队列中,如果一个交换机中有多个队列,那么每个队列都会有这么一条消息
- 但如果队列中有多个消费者,那监听同一队列的消费者依然是竞争关系
- 没有 Routing key 直接绑定队列
Direct 类型
- 定向类型,会把消息交给符合指定的 routing key 的队列中。也就是队列多了一个具名制,会对队列进行命名,消息只会分发给符合命名规则的队列中。
- 通过 Routing key 绑定队列,消息发送到绑定的队列上
- 一个交换机绑定一个队列:定点发送
- 一个交换机绑定多个队列:广播发送
- 通过 Routing key 绑定队列,消息发送到绑定的队列上
Topic 通配符类型
- 类似 Direct 类型,会把消息交给符合指定的 routing pattern 的队列中区别在于,Direct 类型的队列命名是固定的,而 Topic 通配符类型的队列是通配符类型,可以有多种情况命中名称。
- 针对 Routing key 使用通配符
Routing 路由模式
路由模式是使用一个交换机,且交换机当前是direct类型的,并通过绑定多个队列,而每个队列声明路由名称,当生产者发送消息时,会在消息中标记该消息允许名称为xx的队列接收,此时声明了该名称的队列将会收到生产者的消息,同时一个队列可以同时声明多个不同的名称。
消费者方面,只需要绑定特定的队列,即可。
Topics 主题模式
主题模式其实和Direct 类型 的 路由模式差不多,只不过名称上可以有通配符进行匹配名称,同时路由器的模式需要设置为 Topic 类型。
通配符有如下情况:
- # : 匹配 0 个词,或多个词,如 #.error 可以匹配 ".error","goods.error","order.goods.error"
- * : 匹配一个词,如 *.error 可以匹西 "goods.error"。如“*.*”可匹配类似 " a.b "这样的路由键。
下图可以说明使用通配符如何定义队列名称:
- 使用 usa.# 时,可以使 usa.news 和 usa.weather 消息进入红色队列
- 使用 #.news 时,可以使 usa.news 和 europe.news 消息进入橙色队列
- 使用 #.weather 时,可以使 usa.weather 和 europe.weather 消息进入绿色队列
- 使用 europe.# 时,可以使 europe.news 和 europe.weather 消息进入蓝色队列
RPC
RPC 为远程过程调用,本质上是同步调用,和我们使用 OpenFeign 调用远程接口一样,所以这不是典型的消息队列工作方式。
整合 SpringBoot
引入依赖
SpringBoot 官方已经推出了针对 RabbitMQ 的依赖,直接使用这个依赖就可以。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建消费者监听器
对于消费者而言,SpringBoot 中使用的是监听器,通过创建一个类,用于作为消费者监听来自生产者的数据的时候,进行自定义操作。
基础信息有:
- 监听的交换机名称:消费者需要设置监听的交换机,一个MQ服务器有很多交换机,我们需要明确指出这个监听器要监听哪个交换机的消息
- 路由键:通常我们使用的比较多的都是 Topic 或 Direct 路由模式,这两种模式可以更好的区分任务队列,所以我们需要声明这个监听器要处理哪个路由键带来的消息
- 队列名称:监听器主要就是监听和获取来自队列中的消息,所以我们必须清楚我们要监听的是哪个队列的消息
配置 RabbitMQ 服务器
rabbitmq:
host: 192.168.101.101
port: 5672
username: tzming
password: tzminglove
virtual-host: /
使用注解注册监听处理方法
RabbbitTemplate 框加中提供了一个专用来标记监听消息消费的注册 @RabbitListener
- 注解提供绑定功能
- bindings : 对队列进行绑定参数值,使用 @QueueBinding 来创建一个绑定规则
- value: 绑定的队列,使用 @Queue 进行创建
- @Queue : value值为 队列名称
- @Queue : durable 值为是否持久化存储消息,使用 "true" 或 "false" 来表示,不使用 Boolean
- exchange: 绑定的交换机,使用 @Exchange 进行创建
- key: 绑定的路由键,直接使用 String 值
- value: 绑定的队列,使用 @Queue 进行创建
- bindings : 对队列进行绑定参数值,使用 @QueueBinding 来创建一个绑定规则
监听方法中,需要声明三个形参
- String dataString : 如果你知道你发送的消息是 String 类型的,可以在接收端事先定义 String 类型。类型可以根据消息数据来变
- Message message : 消息对象,封装了消息数据的对象
- Channel channel : 频道对象
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = Queue_Name,durable = "true"),
exchange = @Exchange(ExChange_Direct),
key = {Routing_Key}
)
)
public void processListener(String data, Message message, Channel channel){
}
在消费者中定义这些,是因为如果MQ中没有对应的交换机和队列时,会自动创建。如果MQ中本身就有的话,我们可以简单的写 queue 来声明监听队列就可以了。
@RabbitListener(queues = Queue_Name) // 只声明了监听队列,但这样需要MQ服务器中已设置好交换机绑定等信息
public void processListener(String data, Message message, Channel channel){}
使用 RabbitTemplate 发送消息
因为我们只需要交换机名称和 key 就可以找到对应的队列,(其实队列名就是 routing key ,只不过队列可以声明多个routing key)
String data = "Hello RabbitMQ";
// 发送消息到 MQ 服务器
rabbitTemplate.convertAndSend(ExChange_Direct, Routing_Key, data);
消息可靠性投递
我们对MQ发送消息时,消费者通过MQ获取到消息后进行消费,但也不保证整个过程都顺利进行,比如
- 问题一:消息没有发送到消息队列上
- 在生产端进行确认,对MQ队列进行确认
- 问题二:消息成功存入队列,但是消息MQ服务器宕机了,原本保存在内存中的消息也丢失了
- 开启持久化存储功能,让消息保存在硬盘中
- 问题三:消息成功存入队列,但是消费者服务宕机了,或抛异常了
- 消费端消费成功,给服务器返回ACK确认信息,然后消息队列再删除该消息
- 消费端消费失败,给服务器返回NACK信息,MQ服务器把消息设置为待消费状态
配置文件开启交换机确认
开启后,生产者发送的消息会开启确认
spring:
rabbitmq:
# 交换机确认
publisher-confirm-type: correlated
配置文件开启队列确认
开启后,消费者在消费后消息会开启确认
spring:
rabbitmq:
# 队列确认
publisher-returns: true
代码中配置交换机确认回调方法
在开启交换机确认机制后,我们需要构造一个用于回调交换机确认信息的方法,这个方法需要实现来自 RabbitTemplate.ConfirmCallback 的接口 confirm
/**
* 交换机确认机制回调函数
* @param correlationData
* @param ack 是否成功发送消息到交换机中,true 表示成功发送到交换机
* @param cause 当发送失败时的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
}
设置后,不是注入到 Bean,而是把这个回调方法设置到 RabbitTemplate 中。
// 把回调放到 RabbitTemplate 中
rabbitTemplate.setConfirmCallback(this);
代码中配置队列确认回调方法
在开启交换机确认机制后,我们需要构造一个用于回调交换机确认信息的方法,这个方法需要实现来自 RabbitTemplate.ReturnsCallback 的接口 returnedMessage 接口
/**
* 队列确认机制回调函数,只有失败了才会回调
* @param returnedMessage 包含如下
* Message message: 消息以及消息相关数据
* int replyCode : 应答码,类似于 HTTP状态码
* String replyText: 应答码说明
* String exchange: 交换机名称
* String routingKey : 路由键名称
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
returnedMessage.getMessage().getBody(); // 获取消息
returnedMessage.getExchange(); // 交换机名称
returnedMessage.getReplyCode(); // 应答码
returnedMessage.getReplyText(); // 应答码说明
returnedMessage.getRoutingKey(); // 路由键名称
}
设置后,不是注入到 Bean,而是把这个回调方法设置到 RabbitTemplate 中。
rabbitTemplate.setReturnsCallback(this);
合并设置
合并两个方法的实现,可以设置到同一个类中实现方法,因为我们要事先把 returnedMessage 回调 和 confirm 回调设置给 RabbitTemplate,所以我们可以新建一个方法,使用 @PostConstruct 标记在类创建后,执行 RabbitTemplate 设置工作:
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 在初始化阶段,把 confirm 和 returnedMessage 回调方法设置到 RabbitTemplate 中
*/
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
消息可靠性消费
对于消费者是否消费成功的问题,RabbitMQ 默认使用了全自动应答机制,即消息是否被成功消费,由消费者自动处理,但是我们可以对消费可靠性做自定义行为,即手动发送 ACK 或 NACK 应答。
1.首先我们要关闭RabbitMQ 的自动ACK应答功能:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
2.我们在Listener 接收消息方法中,对消息做ACK和NACK应答。
我们可以在处理消息过过程中使用 try-catch 来判断代码是否成功执行(消费成功)
使用 Channel 中的对象发关 ACK 或 NACK 应答。
channel.basicAck();
其中 basicAck 方法需要提交两个参数,分别如下:
- long deliveryTag
- boolean multiple
deliveryTag 值
说明:在生产者发布消息到MQ服务器时,为了识别每个消息的独立性,或者叫标识,通常都会生成携带属于这个消息的“ID”值,这个值由8位组成,如下图
当我们对MQ服务器提交ACK应答时,我们需要提交这个标识,让MQ服务器知道,是哪一条消息被成功消费了。
存在标识的作用:消费端把消息处理结果 ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息圾行后续操作,例如删除消息,重新排队或标记为死信等等。
Broker就必须知道它现在要操作的消息具体是哪一条。
deliveryTag 值如果使用广播方式广播到多个队列里时,同一个消息不同队列,deliveryTag值也不会相同。
获取 deliveryTag 值可通过 message 对象获取
// 获取deliveryTag值
long deliveryTag = message.getMessageProperties().getDeliveryTag();
multiple值
multiple 是指是否对这个 deliveryTag 之前的值都批量处理,比如当你处理到 00000005 值的消息并成功时,是否把 01到04的消息也一并设置为ACK.
这里的目的可能是,你都处理到05消息了,那01到04可能不会再接收
当 multiple 为 true 时,表示会把当前消息之前的所有消息都批量为 ACK
当 multiple 为 false 时,表示只会把当前的消息设置为ACK(推荐)
考虑到,一个队列中可能有多个消费者,有一些队列中的消息并非只有当前的监听者在处理,如果设置为 true,有可能会把别的消息者消息一并删除,影响其它消费者消费数据。所以应当设置为 false
代码示例:
/**
* MQ服务器中无交换机和队列,会根据注解自动创建绑定
* 也可以使用这个,但是不能自动创建绑定 @RabbitListener(queues = Queue_Name)
* @param data 收到的消息
* @param message 包装了消息的对象
* @param channel 频道,既发送这条消息的管道
* @throws IOException
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = Queue_Name,durable = "true"),
exchange = @Exchange(ExChange_Direct),
key = {Routing_Key}
)
)
public void processListener(String data, Message message, Channel channel) throws IOException {
// 获取deliveryTag值
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("对消息进行消费:" + data);
// 当消费消息成功后,给服务器返回 ACK 应答
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
/**
* 如果消费数据时发生错误,会抛出异常,这时我们可以有以下选择
* 1.返回 NACK 应答,并放回队列
* 2.返回 NACK 应答,不放回队列,消息丢弃
* 我们两样都做,在第一次失败时,应当放回队列重试。如果重试后依然失败,则丢弃消息
*/
// 获取当前消息是否是重回队列消息
Boolean redelivered = message.getMessageProperties().getRedelivered();
/**
* 当 redelivered 是 false 时,表示当前消息是第一次失败,应当重放
* 当 redelivered 是 true 时,表示当前消息已经重放过依然失败,所以丢弃
* 因为 redelivered=false时,requeue应当为 true.
* 当 redelivered=true时,requeue应当为 false
* 可以使用 !redelivered 来简化逻辑
*/
channel.basicNack(deliveryTag, false, !redelivered);
}
}
关于 Reject 应答
- Reject 应答也会拒绝消息的,也会发送NACK应答给MQ服务器,但是Reject不能控制 multiple.
- channel.basicReject(deliveryTag消息标识, redelivered是否放回队列)
消费端限流
在MQ服务器看来,如果MQ服务器在一个时刻中有100个任务,遇到消费者端时,消费者端是一次过瞬间把100个任务都接下来,并一一处理
在MQ服务器中可以看出来,在一瞬间中,所有消息都被取走了,然后等待ACK.如下图
这样的缺点是,万一这个消费者的处理能力不够强,但偏偏要一次把所有任务都揽过来做,影响了整体的消息处理效率。
我们应该让消费端每处理一次消息时,只往MQ服务器取一条消息,处理完后再获取一条。
创建配置即可:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每一次只取一个消息回来处理,处理完后再去MQ服务器取消息
设置为1则说明一次只拿一条消息进行消费
消息超时
对于一些消息来说,有时效性,当这个消息在一定时间过后还没有被取走,就应该被删除
对于超时时间来说,有两个层面来设定
- 队列超时
- 在队列层面设定消息的过期时间,并不是队列的过期时间,意思是这个队中的消息全部使用同一个过期时间
- 消息本身
- 给具体的某个消息设定过期时间
- 如果两个层面都设置了,那么以最短的时间为主
1.对于队列超时,可以通过引入 RabbitAdmin 对象来创建,并在创建一个 Queue 对象中设置超时属性:
@Autowared
private RabbitAdmin rabbitAdmin;
/**
* 创建一个队列
* 我们可以在创建时把 x-message-ttl 超时属性值加上去,该队列的所有消息都会在对应的时间段内产生过期
* name: 队列名
* durable: 是否持久化
* exclusive: 排他性访问,当消费者断开链接时,队列将被删除
* autoDelete: 队列曾经有过消费者,当所有消费者断开连接时,队列自动删除
* arguments: 使用 Map 来存储队列的一些属性,key 为属性名,value 为属性值
* (1)x-message-ttl:消息的过期时间,单位:毫秒;
* (2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
* (3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
* (4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
* (5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
* (6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
* (7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
* (8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
* (9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
* (10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
* (11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
*/
Queue queue = new Queue("queue.timeout", true, false, false, Map.of("x-message-ttl", 60000));
/**
* 加入队列中
*/
rabbitAdmin.declareQueue(queue);
2.对于消息中的超时,我们可以创建一个消息后置处理器,即消息被生成出来,但还没有被发送出去之前的操作,我们可以把消息设置为有超时时间
我们需要实现消息后置处理器的接口:MessagePostProcessor
String data = "Hello RabbitMQ";
MessagePostProcessor processor = new MessagePostProcessor() {
/**
* 在消息创建完成但没有发送出去之前,手动设置消息的超时时间
*/
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000"); // 5000 毫秒
return message;
}
};
// 发送消息到 MQ 服务器
rabbitTemplate.convertAndSend(ExChange_Direct, Routing_Key, data, processor);
死信和死信队列
消息超时后删除的说法并不准确,应该是会被调到死信队列中。
死信队列也是一个普通队列,只是可以在普通队列中设置另一个队列为死信队列,当普通消息出现问题就会被传到死信队列中。
能进入死信队列的有如下三种情况:
- 拒绝:消费者拒接消息,NACK/Reject 应答,且不再把消息重新放回原队列中去的,即 requeue=false
- 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经到达10条消息,此时如果再发送一条消息进来,会根据先进先出原则,队列中最早的消息会变成死信
- 超时:消息到达超时时间未被消费
死信的处理方式大致有下面三种:
- 丢弃:对不重要的消息直接丢弃,不做处理
- 入库:把死信写入数据库,日后处理
- 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)
代码示例:创建一个普通队列的消费者,和一个死信队列的消费者
- 普通
- 普通交换机:exchange.normal
- 普通队列:queue.normal.order
- 路由键:routing.normal
- 即当生产者发送消息到 exchange.normal 交换机的 routing.normal 键时,队列 queue.normal.order 会收到消息
- 死信
- 死信交换机:exchange.dead
- 死信队列:queue.dead.letter.order
- 路由键:routing.dead
- 绑定
- 普通队列 queue.normal.order 绑定【exchange.normal】交换机的 【routing.normal】
- 普通队列 queue.normal.order 绑定【x-dead-letter-exchange】属性为 exchange.dead
- 普通队列 queue.normal.order 绑定【x-dead-letter-routing-key】属性为 routing.dead
- 当普通队列 queue.normal.order消息出现错误时,错误的消息会被转入死信队列 queue.dead.letter.order中
public static final String Queue_Dead = "queue.dead.letter.order";
/**
* 处理死信队列的
* @param data
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = Queue_Dead)
public void doDeadLetter(String data, Message message, Channel channel) throws IOException {
System.out.println("处理死信:" + data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
至于如何使用代码创建死信队列,可以使用 RabbitAdmin 进行创建,并添加参数。
延迟队列
延迟队列的意思是,当我们的生产者发送消息到MQ服务器之后,不会马上把消息传给消费者处理,而是等到一定时间之后,消费者才能收到消息。
延迟队列有两种方素:
- 使用死信队列的原理,即普通队列设置消息过期时间,然后过期后派发到死信队列中,这个过期时间就是延迟时间。
- 缺点是需要两个队列合作,而且消费者需要处理的不是普通队列,而是死信队列。
- 使用插件达到延迟队列功能。
- 使用插件“rabbitmq_delayed_message_exchange”,下载地址
本次使用插件来完成延迟队列功能。
步骤
1.下载插件:
放到RabbitMQ 服务器的 Plugins 文件夹中,如果在Docker中需要放到对应的映射盘文件夹上。
2.RabbitMQ服务安装插件
在命令行中输入命令以安装插件:
rabbitmq-plusins enable rabbitmq_delayed_message_exchange
*可以通过rabbitmq-plusins list 来获取已安装的插件。
安装完成后,重启RabbitMQ 服务
3.创建延迟队列:
在后台中,创建交换机时,会多出一个选项“x-delay-message”,如下图
我们选择“x-delay-message”。
当我们选择“x-delay-message”后,我们的交换机就不能定义派发模式了(比如direct模式),所以我们需要在参数中定义我们的交换机使用哪种模式,其参数名为
x-delayed-type 为 direct(或其它模式)
4.正常创建队列即可。
5.生产者中对消息定义延期时间:
使用 前置处理器,来设定消息的延期时间。
String data = "Hello RabbitMQ";
MessagePostProcessor processor = message -> {
/**
* 消息设置属性,使用 setHeader 来设置
* 使用延迟插件专用的属性 x-delay 来声明该消息会在多久到达队列
*/
message.getMessageProperties().setHeader("x-delay","10000");
return message;
};
// 发送消息到 MQ 服务器
rabbitTemplate.convertAndSend(ExChange_Direct, Routing_Key, data, processor);
注意:使用延期插件,不管队列是否成功发送到消费者,returnedMessage 回调会一直执行
事务消息
如果在一个方法中,有两个以上的消息发送,但在方法的执行中途发生异常了,正常情况下,MQ服务器是会收到异常之前的消息
但如果一个方法中所发送的消息不能只单独存在:即B消息如果发送不成功,那么发送成功的A消息也应该被回滚。
可以使用 @Transactional 注解来做事务控制,在方法上加 @Transactional 后,如果方法存在异常,所有操作都将回滚。
并且需要
- 1.提供 RabbitTransactionManager 对象
- 2.在 RabbitTemplate 中设置 setChannelTransacted 为 true 才行。
// 需要设置 ChannelTransacted 为 true
// 这段代码可以放在 @PostConstruct 标记的方法中
rabbitTemplate.setChannelTransacted(true);
/**
* 创建一个事务管理器对象
* @param cachingConnectionFactory
* @return
*/
@Bean
public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory){
return new RabbitTransactionManager(cachingConnectionFactory);
}
惰性队列
惰性队列,指的是,当我们的队列要求持久化时,我们的队列数据在什么时候写入到硬盘中的行为。如下图,我们创建的队列通常都为 Durable 即持久化开启。
所以惰性队列必须是持久化的前提下的问题。
两者的区别:
- 非惰性:无必要不会存入硬盘,除非不得不存入硬盘时,例如日常运行不存入硬盘,但当服务即将关闭时,MQ才会把队列数据存进硬盘
- 惰性:在服务器空闲时,尽可能的存入硬盘,MQ服务器会在合适的时候,比如相对空闲时,或队列数据量过多时,就会自动把队列数据存入硬盘,以防些队列数据在内存中堆积过多。
基于@Bean 声明惰性队列
@Bean
piblic Queue lazyQueue(){
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启x-queue-mode为lazy
.build();
}
基于 @RabbitListener 声明惰性队列
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value = "lazy")
))
public void listenLazyQueue(String name){
log.info("接收到 lazy.queue 的消息:{}", msg);
}
优先级队列
基于队列的先进先出的特性,通常来说,先入队的先投递
优先级是指,设置优先级后,优先级高的消息更大几率会先投递。
关键参数:x-max-priority
- x-max-priority 是一个优先级指标,它是定义消息的优先级规定,如果 x-max-priority 设置为 0,则消息中设置的优先级也会无效。默认值为0
这个值只是作为一个最高优先级参考,不是设置了10就是10,是消息能设置的最高优先级为10
官网建议在1~5之间设置消息的优先级,优先级越高,分配的CPU资源就越高
在发送代码的后置处理器中定义消息的优先级:数值不能超过x-max-priority的值,否则会无效。
数值越大,优先级越高。
MessagePostProcessor processor = message -> {
/**
* 设置该消息的优先级
*
*/
message.getMessageProperties().setPriority(1);
return message;
};












共有 0 条评论