RabbitMQ 使用说明

安装

在Docker 中安装 MabbitMQ.

docker run -d --name RabbieMQ -p 5672:5672 -p 15672:15672 -v rabbitmq-plugin:/plugins -e RABBITMQ_DEFAULT_USER=tzming -e RABBITMQ_DEFAULT_PASS=tzminglove rabbitmq:3.13.7-management

 

 

运行原理概述

从上图得知,RabbitMQ 的主要运行原理如下:

  • 1.生产者,即消息发布者;消费者,即消息接收者
  • 2.生产者通过连接 MQ 服务器,并创建一个 Channel ,通过 Channel 来发布消息,消息会到达 MQ 服务器的 ExChange ,即交换机
  • 3.ExChange 是用于定义这些消息,应该以何种方式分发给消费者进行消费的,关于多种分发机制,可以在下面的章节中讲述。
  • 4.ExChange 通过把消息发到绑定的 Queue 队列中,等待消费者进来取消息进行消费。
  • 5.消费者通过连接 MQ 服务器,并创建一个 Channel 来接收由 MQ 服务器发来的消息,消息者收到消息并进行消费。
  • 6.完成整个生产者和消费者的过程。

 

Work Queue 工作队列模式

工作队列模式简单的说,就是一个生产者对应多个消费者的队列模式,且消息是竞争关系,谁抢着就算谁的,消息只会被消费一次,是MQ中最简单的工作模式

通过多个消费者竞争消费同一条队列的消息,以提高处理一些工作量比较多的生产服务情况上。

在上图中,生产者 P 直接给 MQ 服务器 的条列 Queue 中发送消息,而消费者 C1 和 C2 对消息进行消费,图中没有表明使用 ExChange 交换机,但实际上这种模式也存在交换机的,只是 MQ 服务器内部预留了一个 Default ExChange 默认交换机。当我们不声明交换机时,MQ 则会默认把消息发送到 Default ExChange 交换机并创建绑定相关 Queue 队列。

 

Publish Subscribe 发布订阅模式

发布订阅模式与直接的工作模式更复杂一些,它有几种类型,所以这里必须要声明自定义的交换机进行消息的分发。

这里需要注意的是,ExChange 交换机并不具备存储消息的能力,当交换机没有绑定队列,或交换机消息无法匹配所绑定的队列名称时,该消息将会被交换机所丢弃。

Fanout 类型

  • 广播类型,会将消息发送给所有绑定到交换机的队列中,如果一个交换机中有多个队列,那么每个队列都会有这么一条消息
  • 但如果队列中有多个消费者,那监听同一队列的消费者依然是竞争关系
  • 没有 Routing key 直接绑定队列

Direct 类型

  • 定向类型,会把消息交给符合指定的 routing key 的队列中。也就是队列多了一个具名制,会对队列进行命名,消息只会分发给符合命名规则的队列中。
    • 通过 Routing key 绑定队列,消息发送到绑定的队列上
      • 一个交换机绑定一个队列:定点发送
      • 一个交换机绑定多个队列:广播发送

Topic 通配符类型

  • 类似 Direct 类型,会把消息交给符合指定的 routing pattern 的队列中区别在于,Direct 类型的队列命名是固定的,而 Topic 通配符类型的队列是通配符类型,可以有多种情况命中名称。
  • 针对 Routing key 使用通配符

 

Routing 路由模式

路由模式是使用一个交换机,且交换机当前是direct类型的,并通过绑定多个队列,而每个队列声明路由名称,当生产者发送消息时,会在消息中标记该消息允许名称为xx的队列接收,此时声明了该名称的队列将会收到生产者的消息,同时一个队列可以同时声明多个不同的名称。

消费者方面,只需要绑定特定的队列,即可。

 

Topics 主题模式

主题模式其实和Direct 类型 的 路由模式差不多,只不过名称上可以有通配符进行匹配名称,同时路由器的模式需要设置为 Topic 类型。

通配符有如下情况:

  • # : 匹配 0 个词,或多个词,如 #.error 可以匹配 ".error","goods.error","order.goods.error"
  • * :  匹配一个词,如 *.error 可以匹西 "goods.error"。如“*.*”可匹配类似 " a.b "这样的路由键。

 

下图可以说明使用通配符如何定义队列名称:

  • 使用 usa.# 时,可以使 usa.news 和 usa.weather 消息进入红色队列
  • 使用 #.news 时,可以使 usa.news 和 europe.news 消息进入橙色队列
  • 使用 #.weather 时,可以使 usa.weather 和 europe.weather 消息进入绿色队列
  • 使用 europe.# 时,可以使 europe.news 和 europe.weather 消息进入蓝色队列

 

RPC

RPC 为远程过程调用,本质上是同步调用,和我们使用 OpenFeign 调用远程接口一样,所以这不是典型的消息队列工作方式。

 

整合 SpringBoot

引入依赖

SpringBoot 官方已经推出了针对 RabbitMQ 的依赖,直接使用这个依赖就可以。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

 

创建消费者监听器

对于消费者而言,SpringBoot 中使用的是监听器,通过创建一个类,用于作为消费者监听来自生产者的数据的时候,进行自定义操作。

基础信息有:

  • 监听的交换机名称:消费者需要设置监听的交换机,一个MQ服务器有很多交换机,我们需要明确指出这个监听器要监听哪个交换机的消息
  • 路由键:通常我们使用的比较多的都是 Topic 或 Direct 路由模式,这两种模式可以更好的区分任务队列,所以我们需要声明这个监听器要处理哪个路由键带来的消息
  • 队列名称:监听器主要就是监听和获取来自队列中的消息,所以我们必须清楚我们要监听的是哪个队列的消息

 

配置 RabbitMQ 服务器

  rabbitmq:
    host: 192.168.101.101
    port: 5672
    username: tzming
    password: tzminglove
    virtual-host: /

 

 

使用注解注册监听处理方法

RabbbitTemplate 框加中提供了一个专用来标记监听消息消费的注册 @RabbitListener

  • 注解提供绑定功能
    • bindings : 对队列进行绑定参数值,使用 @QueueBinding 来创建一个绑定规则
      • value: 绑定的队列,使用 @Queue 进行创建
        • @Queue : value值为 队列名称
        • @Queue : durable 值为是否持久化存储消息,使用 "true" 或 "false" 来表示,不使用 Boolean
      • exchange: 绑定的交换机,使用 @Exchange 进行创建
      • key: 绑定的路由键,直接使用 String 值

监听方法中,需要声明三个形参

  • String dataString : 如果你知道你发送的消息是 String 类型的,可以在接收端事先定义 String 类型。类型可以根据消息数据来变
  • Message message : 消息对象,封装了消息数据的对象
  • Channel channel : 频道对象
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = Queue_Name,durable = "true"),
                    exchange = @Exchange(ExChange_Direct),
                    key = {Routing_Key}
            )
    )
    public void processListener(String data, Message message, Channel channel){

    }

在消费者中定义这些,是因为如果MQ中没有对应的交换机和队列时,会自动创建。如果MQ中本身就有的话,我们可以简单的写 queue 来声明监听队列就可以了。

    @RabbitListener(queues = Queue_Name) // 只声明了监听队列,但这样需要MQ服务器中已设置好交换机绑定等信息
    public void processListener(String data, Message message, Channel channel){}

 

使用 RabbitTemplate 发送消息

因为我们只需要交换机名称和 key 就可以找到对应的队列,(其实队列名就是 routing key ,只不过队列可以声明多个routing key)

String data = "Hello RabbitMQ";
// 发送消息到 MQ 服务器
rabbitTemplate.convertAndSend(ExChange_Direct, Routing_Key, data);

 

 

消息可靠性投递

我们对MQ发送消息时,消费者通过MQ获取到消息后进行消费,但也不保证整个过程都顺利进行,比如

  • 问题一:消息没有发送到消息队列上
    • 在生产端进行确认,对MQ队列进行确认
  • 问题二:消息成功存入队列,但是消息MQ服务器宕机了,原本保存在内存中的消息也丢失了
    • 开启持久化存储功能,让消息保存在硬盘中
  • 问题三:消息成功存入队列,但是消费者服务宕机了,或抛异常了
    • 消费端消费成功,给服务器返回ACK确认信息,然后消息队列再删除该消息
    • 消费端消费失败,给服务器返回NACK信息,MQ服务器把消息设置为待消费状态

 

配置文件开启交换机确认

开启后,生产者发送的消息会开启确认

spring:
    rabbitmq:
       # 交换机确认
        publisher-confirm-type: correlated

 

配置文件开启队列确认

开启后,消费者在消费后消息会开启确认

spring:
    rabbitmq:
       # 队列确认
        publisher-returns: true

 

代码中配置交换机确认回调方法

在开启交换机确认机制后,我们需要构造一个用于回调交换机确认信息的方法,这个方法需要实现来自 RabbitTemplate.ConfirmCallback 的接口 confirm

    /**
     * 交换机确认机制回调函数
     * @param correlationData
     * @param ack 是否成功发送消息到交换机中,true 表示成功发送到交换机
     * @param cause 当发送失败时的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

    }

设置后,不是注入到 Bean,而是把这个回调方法设置到 RabbitTemplate 中。

// 把回调放到 RabbitTemplate 中
rabbitTemplate.setConfirmCallback(this);

 

代码中配置队列确认回调方法

在开启交换机确认机制后,我们需要构造一个用于回调交换机确认信息的方法,这个方法需要实现来自 RabbitTemplate.ReturnsCallback 的接口 returnedMessage 接口

    /**
     * 队列确认机制回调函数,只有失败了才会回调
     * @param returnedMessage 包含如下
     *                        Message message: 消息以及消息相关数据
     *                        int replyCode : 应答码,类似于 HTTP状态码
     *                        String replyText: 应答码说明
     *                        String exchange: 交换机名称
     *                        String routingKey : 路由键名称
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        returnedMessage.getMessage().getBody(); // 获取消息
        returnedMessage.getExchange(); // 交换机名称
        returnedMessage.getReplyCode(); // 应答码
        returnedMessage.getReplyText(); // 应答码说明
        returnedMessage.getRoutingKey(); // 路由键名称
    }

设置后,不是注入到 Bean,而是把这个回调方法设置到 RabbitTemplate 中。

rabbitTemplate.setReturnsCallback(this);

 

合并设置

合并两个方法的实现,可以设置到同一个类中实现方法,因为我们要事先把 returnedMessage 回调 和 confirm 回调设置给 RabbitTemplate,所以我们可以新建一个方法,使用 @PostConstruct 标记在类创建后,执行 RabbitTemplate 设置工作:

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 在初始化阶段,把 confirm 和 returnedMessage 回调方法设置到 RabbitTemplate 中
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

 

消息可靠性消费

对于消费者是否消费成功的问题,RabbitMQ 默认使用了全自动应答机制,即消息是否被成功消费,由消费者自动处理,但是我们可以对消费可靠性做自定义行为,即手动发送 ACK 或 NACK 应答。

1.首先我们要关闭RabbitMQ 的自动ACK应答功能:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

2.我们在Listener 接收消息方法中,对消息做ACK和NACK应答。

我们可以在处理消息过过程中使用 try-catch 来判断代码是否成功执行(消费成功)

使用 Channel 中的对象发关 ACK 或 NACK 应答。

channel.basicAck();

其中 basicAck 方法需要提交两个参数,分别如下:

  • long deliveryTag
    • boolean multiple

    deliveryTag 值

    说明:在生产者发布消息到MQ服务器时,为了识别每个消息的独立性,或者叫标识,通常都会生成携带属于这个消息的“ID”值,这个值由8位组成,如下图

    当我们对MQ服务器提交ACK应答时,我们需要提交这个标识,让MQ服务器知道,是哪一条消息被成功消费了。

    存在标识的作用:消费端把消息处理结果 ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息圾行后续操作,例如删除消息,重新排队或标记为死信等等。

    Broker就必须知道它现在要操作的消息具体是哪一条。

    deliveryTag 值如果使用广播方式广播到多个队列里时,同一个消息不同队列,deliveryTag值也不会相同。

     

    获取 deliveryTag 值可通过 message 对象获取

            // 获取deliveryTag值
            long deliveryTag = message.getMessageProperties().getDeliveryTag();

     

     

    multiple值

    multiple 是指是否对这个 deliveryTag 之前的值都批量处理,比如当你处理到 00000005 值的消息并成功时,是否把 01到04的消息也一并设置为ACK.

    这里的目的可能是,你都处理到05消息了,那01到04可能不会再接收

    当 multiple 为 true 时,表示会把当前消息之前的所有消息都批量为 ACK

    当 multiple 为 false 时,表示只会把当前的消息设置为ACK(推荐)

    考虑到,一个队列中可能有多个消费者,有一些队列中的消息并非只有当前的监听者在处理,如果设置为 true,有可能会把别的消息者消息一并删除,影响其它消费者消费数据。所以应当设置为 false

     

    代码示例:

    /**
         * MQ服务器中无交换机和队列,会根据注解自动创建绑定
         * 也可以使用这个,但是不能自动创建绑定 @RabbitListener(queues = Queue_Name)
         * @param data 收到的消息
         * @param message 包装了消息的对象
         * @param channel 频道,既发送这条消息的管道
         * @throws IOException
         */
        @RabbitListener(
                bindings = @QueueBinding(
                        value = @Queue(value = Queue_Name,durable = "true"),
                        exchange = @Exchange(ExChange_Direct),
                        key = {Routing_Key}
                )
        )
        public void processListener(String data, Message message, Channel channel) throws IOException {
    
            // 获取deliveryTag值
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
    
            try {
                System.out.println("对消息进行消费:" + data);
                // 当消费消息成功后,给服务器返回 ACK 应答
                channel.basicAck(deliveryTag, false);
    
            } catch (IOException e) {
                /**
                 * 如果消费数据时发生错误,会抛出异常,这时我们可以有以下选择
                 * 1.返回 NACK 应答,并放回队列
                 * 2.返回 NACK 应答,不放回队列,消息丢弃
                 * 我们两样都做,在第一次失败时,应当放回队列重试。如果重试后依然失败,则丢弃消息
                 */
    
                // 获取当前消息是否是重回队列消息
                Boolean redelivered = message.getMessageProperties().getRedelivered();
                /**
                 * 当 redelivered 是 false 时,表示当前消息是第一次失败,应当重放
                 * 当 redelivered 是 true 时,表示当前消息已经重放过依然失败,所以丢弃
                 * 因为 redelivered=false时,requeue应当为 true.
                 * 当 redelivered=true时,requeue应当为 false
                 * 可以使用 !redelivered 来简化逻辑
                 */
                channel.basicNack(deliveryTag, false, !redelivered);
            }
    
        }

     

    关于 Reject 应答

    • Reject 应答也会拒绝消息的,也会发送NACK应答给MQ服务器,但是Reject不能控制 multiple.
    • channel.basicReject(deliveryTag消息标识, redelivered是否放回队列)

     

    消费端限流

    在MQ服务器看来,如果MQ服务器在一个时刻中有100个任务,遇到消费者端时,消费者端是一次过瞬间把100个任务都接下来,并一一处理

    在MQ服务器中可以看出来,在一瞬间中,所有消息都被取走了,然后等待ACK.如下图

    这样的缺点是,万一这个消费者的处理能力不够强,但偏偏要一次把所有任务都揽过来做,影响了整体的消息处理效率。

    我们应该让消费端每处理一次消息时,只往MQ服务器取一条消息,处理完后再获取一条。

    创建配置即可:

    spring:
      rabbitmq:
        listener:
          simple:
            prefetch: 1 # 每一次只取一个消息回来处理,处理完后再去MQ服务器取消息

    设置为1则说明一次只拿一条消息进行消费

     

     

    消息超时

    对于一些消息来说,有时效性,当这个消息在一定时间过后还没有被取走,就应该被删除

    对于超时时间来说,有两个层面来设定

    • 队列超时
      • 在队列层面设定消息的过期时间,并不是队列的过期时间,意思是这个队中的消息全部使用同一个过期时间
    • 消息本身
      • 给具体的某个消息设定过期时间
    • 如果两个层面都设置了,那么以最短的时间为主

    1.对于队列超时,可以通过引入 RabbitAdmin 对象来创建,并在创建一个 Queue 对象中设置超时属性:

    @Autowared
    private RabbitAdmin rabbitAdmin;
    
            /**
             * 创建一个队列
             * 我们可以在创建时把 x-message-ttl 超时属性值加上去,该队列的所有消息都会在对应的时间段内产生过期
             * name: 队列名
             * durable: 是否持久化
             * exclusive: 排他性访问,当消费者断开链接时,队列将被删除
             * autoDelete: 队列曾经有过消费者,当所有消费者断开连接时,队列自动删除
             * arguments: 使用 Map 来存储队列的一些属性,key 为属性名,value 为属性值
             * (1)x-message-ttl:消息的过期时间,单位:毫秒;
             * (2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
             * (3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
             * (4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
             * (5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
             * (6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
             * (7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
             * (8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
             * (9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
             * (10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
             * (11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
             */
            Queue queue = new Queue("queue.timeout", true, false, false, Map.of("x-message-ttl", 60000));
    
            /**
             * 加入队列中
             */
            rabbitAdmin.declareQueue(queue);

     

     

    2.对于消息中的超时,我们可以创建一个消息后置处理器,即消息被生成出来,但还没有被发送出去之前的操作,我们可以把消息设置为有超时时间

    我们需要实现消息后置处理器的接口:MessagePostProcessor

            String data = "Hello RabbitMQ";
    
            MessagePostProcessor processor = new MessagePostProcessor() {
                /**
                 * 在消息创建完成但没有发送出去之前,手动设置消息的超时时间
                 */
    
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setExpiration("5000"); // 5000 毫秒
                    return message;
                }
            };
    
            // 发送消息到 MQ 服务器
            rabbitTemplate.convertAndSend(ExChange_Direct, Routing_Key, data, processor);
    
    

     

     

     

    死信和死信队列

    消息超时后删除的说法并不准确,应该是会被调到死信队列中。

    死信队列也是一个普通队列,只是可以在普通队列中设置另一个队列为死信队列,当普通消息出现问题就会被传到死信队列中。

     

    能进入死信队列的有如下三种情况:

    • 拒绝:消费者拒接消息,NACK/Reject 应答,且不再把消息重新放回原队列中去的,即 requeue=false
    • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经到达10条消息,此时如果再发送一条消息进来,会根据先进先出原则,队列中最早的消息会变成死信
    • 超时:消息到达超时时间未被消费

    死信的处理方式大致有下面三种:

    • 丢弃:对不重要的消息直接丢弃,不做处理
    • 入库:把死信写入数据库,日后处理
    • 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)

    代码示例:创建一个普通队列的消费者,和一个死信队列的消费者

    • 普通
      • 普通交换机:exchange.normal
      • 普通队列:queue.normal.order
      • 路由键:routing.normal
      • 即当生产者发送消息到 exchange.normal 交换机的 routing.normal  键时,队列 queue.normal.order 会收到消息
    • 死信
      • 死信交换机:exchange.dead
      • 死信队列:queue.dead.letter.order
      • 路由键:routing.dead
    • 绑定
      • 普通队列 queue.normal.order 绑定【exchange.normal】交换机的 【routing.normal
      • 普通队列 queue.normal.order 绑定【x-dead-letter-exchange】属性为 exchange.dead
      • 普通队列 queue.normal.order 绑定【x-dead-letter-routing-key】属性为 routing.dead
      • 当普通队列 queue.normal.order消息出现错误时,错误的消息会被转入死信队列 queue.dead.letter.order中
        public static final String Queue_Dead = "queue.dead.letter.order";
        /**
         * 处理死信队列的
         * @param data
         * @param message
         * @param channel
         * @throws IOException
         */
        @RabbitListener(queues = Queue_Dead)
        public void doDeadLetter(String data, Message message, Channel channel) throws IOException {
            System.out.println("处理死信:" + data);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    

    至于如何使用代码创建死信队列,可以使用 RabbitAdmin 进行创建,并添加参数。

     

     

    延迟队列

    延迟队列的意思是,当我们的生产者发送消息到MQ服务器之后,不会马上把消息传给消费者处理,而是等到一定时间之后,消费者才能收到消息。

    延迟队列有两种方素:

    • 使用死信队列的原理,即普通队列设置消息过期时间,然后过期后派发到死信队列中,这个过期时间就是延迟时间。
      • 缺点是需要两个队列合作,而且消费者需要处理的不是普通队列,而是死信队列。
    • 使用插件达到延迟队列功能。
      • 使用插件“rabbitmq_delayed_message_exchange”,下载地址

    本次使用插件来完成延迟队列功能。

    步骤

    1.下载插件:

    放到RabbitMQ 服务器的 Plugins 文件夹中,如果在Docker中需要放到对应的映射盘文件夹上。

    2.RabbitMQ服务安装插件

    在命令行中输入命令以安装插件:

    rabbitmq-plusins enable rabbitmq_delayed_message_exchange

    *可以通过rabbitmq-plusins list 来获取已安装的插件。

    安装完成后,重启RabbitMQ 服务

    3.创建延迟队列:

    在后台中,创建交换机时,会多出一个选项“x-delay-message”,如下图

    我们选择“x-delay-message”。

    当我们选择“x-delay-message”后,我们的交换机就不能定义派发模式了(比如direct模式),所以我们需要在参数中定义我们的交换机使用哪种模式,其参数名为

    x-delayed-type 为 direct(或其它模式)

    4.正常创建队列即可。

    5.生产者中对消息定义延期时间:

    使用 前置处理器,来设定消息的延期时间。

            String data = "Hello RabbitMQ";
    
            MessagePostProcessor processor = message -> {
                /**
                 * 消息设置属性,使用 setHeader 来设置
                 * 使用延迟插件专用的属性 x-delay 来声明该消息会在多久到达队列
                 */
                message.getMessageProperties().setHeader("x-delay","10000");
                return message;
            };
    
            // 发送消息到 MQ 服务器
            rabbitTemplate.convertAndSend(ExChange_Direct, Routing_Key, data, processor);
    

     

    注意:使用延期插件,不管队列是否成功发送到消费者,returnedMessage 回调会一直执行

     

    事务消息

    如果在一个方法中,有两个以上的消息发送,但在方法的执行中途发生异常了,正常情况下,MQ服务器是会收到异常之前的消息

    但如果一个方法中所发送的消息不能只单独存在:即B消息如果发送不成功,那么发送成功的A消息也应该被回滚。

    可以使用 @Transactional 注解来做事务控制,在方法上加 @Transactional 后,如果方法存在异常,所有操作都将回滚。

    并且需要

    • 1.提供 RabbitTransactionManager 对象
    • 2.在 RabbitTemplate 中设置 setChannelTransacted 为 true 才行。
    // 需要设置 ChannelTransacted 为 true 
    // 这段代码可以放在 @PostConstruct 标记的方法中
    rabbitTemplate.setChannelTransacted(true);
    
    
        /**
         * 创建一个事务管理器对象
         * @param cachingConnectionFactory
         * @return
         */
        @Bean
        public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory){
            return new RabbitTransactionManager(cachingConnectionFactory);
        }

     

     

     

     

    惰性队列

    惰性队列,指的是,当我们的队列要求持久化时,我们的队列数据在什么时候写入到硬盘中的行为。如下图,我们创建的队列通常都为 Durable 即持久化开启。

    所以惰性队列必须是持久化的前提下的问题。

    两者的区别:

    • 非惰性:无必要不会存入硬盘,除非不得不存入硬盘时,例如日常运行不存入硬盘,但当服务即将关闭时,MQ才会把队列数据存进硬盘
    • 惰性:在服务器空闲时,尽可能的存入硬盘,MQ服务器会在合适的时候,比如相对空闲时,或队列数据量过多时,就会自动把队列数据存入硬盘,以防些队列数据在内存中堆积过多。

     

    基于@Bean 声明惰性队列

    @Bean
    piblic Queue lazyQueue(){
        return QueueBuilder
                .durable("lazy.queue")
                .lazy() //  开启x-queue-mode为lazy
                .build();
    }

    基于 @RabbitListener 声明惰性队列

    @RabbitListener(queuesToDeclare = @Queue(
        name = "lazy.queue",
        durable = "true",
        arguments = @Argument(name = "x-queue-mode", value = "lazy")
    ))
    public void listenLazyQueue(String name){
        log.info("接收到 lazy.queue 的消息:{}", msg);
    } 

     

     

    优先级队列

    基于队列的先进先出的特性,通常来说,先入队的先投递

    优先级是指,设置优先级后,优先级高的消息更大几率会先投递。

    关键参数:x-max-priority

    • x-max-priority 是一个优先级指标,它是定义消息的优先级规定,如果 x-max-priority 设置为 0,则消息中设置的优先级也会无效。默认值为0

    这个值只是作为一个最高优先级参考,不是设置了10就是10,是消息能设置的最高优先级为10

    官网建议在1~5之间设置消息的优先级,优先级越高,分配的CPU资源就越高

    在发送代码的后置处理器中定义消息的优先级:数值不能超过x-max-priority的值,否则会无效。

    数值越大,优先级越高。

            MessagePostProcessor processor = message -> {
                /**
                 * 设置该消息的优先级
                 * 
                 */
                message.getMessageProperties().setPriority(1);
                return message;
            };

     

    如果您喜欢本站,点击这儿不花一分钱捐赠本站

    这些信息可能会帮助到你: 下载帮助 | 报毒说明 | 进站必看

    修改版本安卓软件,加群提示为修改者自留,非本站信息,注意鉴别

    THE END
    分享
    二维码
    打赏
    海报
    RabbitMQ 使用说明
    安装 在Docker 中安装 MabbitMQ. docker run -d --name RabbieMQ -p 5672:5672 -p 15672:15672 -v rabbitmq-plugin:/plugins -e RABBITMQ_DEFAULT_USER=tzming -e RABBITMQ_DEFAULT_PASS=tzmi……
    <<上一篇
    下一篇>>