RabbitMQ——详细使用


RabbitMQ——详细使用

1. RabbitMQ基本知识回顾

1.1 前言

可能你在没学消息中间件之前都已经听过很多概念了,JMSAMQPActiveMQRabbitMQKafkaRocketMQ,一个消息中间件怎么能搞出怎么多概念?乱不乱啊,

别烦,本文从历史的角度帮你理清这些MQ和协议之间的关系。

1.2 什么是消息中间件?

消息中间件属于分布式系统中的一个子系统,关注于数据的发送和接收,利用高效可靠的消息传递机制对分布式系统中的其余各个子系统经进行集成

1.3 消息中间件的使用场景

1.3.1 异步处理

非核心流程异步化,提高系统响应性能

以前的流程:

以前的流程

有了消息中间件之后的流程:

有了消息中间件之后的流程

原来用户注册一下可能得依次写数据库,发送邮件和短信后,才能提示用户注册成功

现在只要写数据库,写消息队列后就直接提示用户注册成功,发送邮件和短信是异步处理,提高了响应速度

1.3.2 应用解耦

系统不是强耦合,消息接受者可以随意增加,而不需要修改消息发送者的代码。消息发送者的成功不依赖消息接受者

RPC实现:

rpc实现

消息队列实现:

如果库存系统出了问题,用户就不能正常下单,这是不合理的。可以通过消息队列来解耦。

当有新的系统如广告系统对用户的订单也感兴趣的时候,只需要从消息队列中拿消息即可,订单系统完全不用改变

1.3.3 流量削峰

当上下游系统处理能力存在差距的时候,可以用消息队列进行缓冲

当有秒杀业务时,一下有大量请求涌入时,很可能造成系统瘫痪,此时可以用消息队列缓冲一下

1.3.4 日志处理

将消息队列用在日志处理中,比如Kafka可以用来解决大量日志传输的问题

1.3.5 消息通讯

消息队列一般都内置了高效的通信机制,因此也可以用于单纯的消息通讯,比如实现点对点消息队列或者聊天室等

1.4 消息中间件编年史

初见曙光

消息中间件其实诞生的很早,在互联网应用还是一片荒芜的年代,有个在美国的印度哥们Vivek Ranadive就设想了一种通用软件总线,采用发布订阅的模式,像主板上的总线一样供其他相应程序接入。他创办了一家公司Teknekron,实现了世界上第一个消息中间件The Information Bus(TIB)

各自为战

TIB受到了企业的欢迎,Teknekron的业务发展引起了当时最牛气的IT公司IBM的注意,于是他们也开始研发了自己消息队列软件,于是才有了后来的wesphere mq,微软也陆续加入了战团。由于商业壁垒,商业MQ供应商想要解决应用应用互通的问题,而不是去创建标准来实现不同MQ产品间的互通,或者允许应用程序更改MQ平台

劫制天下

为了打破这个壁垒,同时为了能够让消息在各个消息队列平台间互融互通, JMS (Java Message Service) 应运而生 。 JMS 试图通过提供公共 Java API 的方式,隐藏单独 MQ 产品供应 商提供的实际接口,从而跨越了壁垒,以及解决了互通问题。从技术上讲, Java 应用程序只需 针对 JMS API 编程,选择合适的 MQ 驱动即可, JMS 会打理好其他部分 。 ActiveMQ 就是 JMS 的 一种实现 。 不过尝试使用单独标准化接口来胶合众多不同的接口,最终会暴露出问题,使得 应用程序变得更加脆弱 。 所以急需一种新的消息通信标准化方案 。

一统江湖

在 2006 年 6 月,由 Cisco 、 Redhat 、iMatix 等联合制定了 AMQP 的公开标准,由此 AMQP 登上了历史的舞台 。 它是应用层协议的一个开放标准,以解决众多消息中间件的需求和拓扑结构问题 。 它为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制 。

合久必分

LinkedIn在实现消息队列的时候觉得AMQP规范并不适合自己,所以Kafka不支持AMQP协议RocketMQ在实现上借鉴Kakfa的思想,所以也不支持AMQP协议,并且你会发现在KafkaRocketMQ中都有类似TopicConsumer Group的概念,而这些概念在AMQP协议中是不存在的。

1.5 如何选择消息中间件

  1. ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
  2. RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
  3. RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些。
  4. Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。Kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略。Kafka天然适合大数据实时计算以及日志收集。

1.6 消息模型

如果让你设计一个消息队列?你会怎么实现呢?

可能你立马就会想到用队列,一边放,一边取。这其实就是消息队列常见的一种消息模型,队列模型

所以一个简单的消息队列用Redis中的List就能实现。当然Redis5.0版本之后针对消息队列这种场景新增了专门设计了一个数据结构Streams

队列模型有哪些缺点呢?

如果将一类消息发送给不同的消费者,每个消费者都要接收全量的消息,此时就很不方便。因为你要将相同的消息发送到不同的队列,多一个消费者就得多发一份队列。这样生产者必须知道有多少个消费者,不利于解耦。

那么该如何解决这个问题呢?

想想RabbitMQ的结构图:

RabbitMQ结构图

RabbitMQ并不是直接把消息发送到队列中的,而是发送到Exchange中,ExchangeQueue进行关联,消息由Exchange根据规则发送到对应的队列。这样生产者和消费者完成了解耦。

还有哪种方式能解决这种多消费者的问题呢?

答对了,就是发布订阅模型

发布订阅模型

RocketMQKafka都是基于发布订阅模型实现的,RocketMQ的消息模型图如下

RocketMQ消息模型图

生产者是发布者,消费者是订阅者,消息是主题

为了提高消费的并行度,一类消息会被分发到多个队列中,在RocketMQ中叫Queue,在Kafka中叫做Partition(分区),都是类似的概念。

1.7 AMQP协议详解

前面说到消息中间件有2种协议,JMSAMQPJMS你可以类比为JDBC,搞了一套接口让不同厂商来实现这个接口,但是这个协议设计的确实不够优雅,因此就不介绍这个协议了,除非你用ActiveMQ,不然学了真没啥用。详细说一下AMQP协议,毕竟现在用RabbitMQ的公司还是很多的,要想学好RabbitMQAMQP协议是必须要清楚的。

上图是AMQP协议中一个消息的流转过程,画的的很清楚,不详细介绍了。

1.8 AMQP核心概念

介绍一些AMQP协议常见的概念。

概念 解释
Server 又称Broker,接受客户端的连接,实现AMQP实体服务
Connection 一个网络连接,比如TCP/IP套接字连接
Channel 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质
Message 消息,服务器和应用程序之间传送的数据,由PropertiesBody组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体内容
Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个ExchangeQueue,同一个Virtual Host里面不能有相同名称的ExchangeQueue
Binding 消息队列和交换器之间的关联
Routing Key 一个消息头,交换器可以用这个消息头决定如何路由某条消息
Message Queue 消息队列,用来保存消息直到发送给消费者

如果有用过ActiveMQRabbitMQ,对上面的名词一定不会陌生。


2. RabbitMQ特性

2.1 前言

我们先来看一下一条消息在RabbitMQ中的流转过程

图示的主要流程如下:

  1. 生产者发送消息的时候指定RoutingKey,然后消息被发送到Exchange
  2. Exchange根据一些列规则将消息路由到指定的队列中,主要是根据BindingKey来路由
  3. 消费者从队列中消费消息

整个流程主要就4个参与者messageexchangequeueconsumer,我们就来认识一下这4个参与者

2.2 Message

消息可以设置一些列属性,每种属性的作用可以参考《深入RabbitMQ》一书。

属性名 用处
contentType 消息体的MIME类型,如application/json
contentEncoding 消息的编码类型,如是否压缩
messageId 消息的唯一性标识,由应用进行设置
correlationId 一般用作关联消息的message-id,常用于消息的响应
timestamp 消息的创建时刻,整型,精确到秒
expiration 消息的过期时刻,字符串,但是呈现格式为整型,精确到秒
deliveryMode 消息的持久化类型 ,1为非持久化,2为持久化,性能影响巨大
appId 应用程序的类型和版本号
userId 标识已登录用户,极少使用
type 消息类型名称,完全由应用决定如何使用该字段
replyTo 构建回复消息的私有响应队列
headers 键/值对表,用户自定义任意的键和值
priority 指定队列中消息的优先级

这些属性其实就是Java中的:AMQP.BasicProperties这个类的属性。

2.3 Exchange

接收消息,并根据路由键转发消息到所绑定的队列,常用的属性如下

交换机属性 类型
name 交换器名称
type 交换器类型,有如下四种,directtopicfanoutheaders
durability 是否需要持久化,true为持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息
autoDelete 与这个Exchange绑定的QueueExchange都与此解绑时,会删除本交换器
internal 设置是否内置,true为内置。如果是内置交换器,客户端无法发送消息到这个交换器中,只能通过交换器路由到交换器这种方式
argument 其他一些结构化参数

我们最常使用的就是type属性,下面就详细解释type属性

2.4 Fanout Exchange

发送到该交换机的消息都会路由到与该交换机绑定的所有队列上,可以用来做广播

不处理路由键,只需要简单的将队列绑定到交换机上

Fanout交换机转发消息是最快

2.5 Direct Exchage

把消息路由到BindingKey和RoutingKey完全匹配的队列中

2.6 Topic Exchange

前面说到,direct类型的交换器路由规则是完全匹配RoutingKeyBindingKeytopicdirect类似,也是将消息发送到RoutingKeyBindingKey相匹配的队列中,只不过可以模糊匹配:

  1. RoutinKey为一个被“.”号分割的字符串(如com.rabbitmq.client)
  2. BindingKey和RoutingKey也是“.”号分割的字符串
  3. BindKey中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配不多不少一个词,“#”用于匹配多个单词(包含0个,1个)
BindIngKey 能够匹配到的RoutingKey
java.# java.lang,java.util, java.util.concurrent
java.* java.lang,java.util
*.*.uti com.javashitang.util,org.spring.util

假如现在有2个RoutingKey为java.lang和java.util.concurrent的消息,java.lang会被路由到Consumer1和Consumer2,java.util.concurrent会被路由到Consumer2。

2.7 Headers Exchange

headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送消息内容中的headers属性进行匹配。headers类型的交换器性能差,不实用,基本上不会使用

2.8 Queue

上面讲解了交换器的四种类型:fanoutdirecttopicheaders,下面来看看队列相关的知识。

队列的常见属性如下:

参数名 用处
queue 队列的名称
durable 是否持久化,true为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
exclusive 设置是否排他,true为排他。如果一个队列被声明为排他队列,该队列仅对首次声明他它的连接可见,并在连接断开时自动删除(即一个队列只能有一个消费者)
autoDelete 设置是否自动删除,true为自动删除,自动删除的前提是,至少一个消费者连接到这个队列,之后所有与这个连接的消费者都断开时,才会自动删除
arguments 设置队列的其他参数,如x-message-ttl,x-max-length

arguments中可以设置的队列的常见参数如下:

参数名 目的
x-dead-letter-exchange 死信交换器
x-dead-letter-routing-key 死信消息的可选路由键
x-expires 队列在指定毫秒数后被删除
x-ha-policy 创建HA队列
x-ha-nodes HA队列的分布节点
x-max-length 队列的最大消息数
x-message-ttl 毫秒为单位的消息过期时间,队列级别
x-max-priority 最大优先值为255的队列优先排序功能

3. RabbitMQ-API及其使用

注意:本小结使用的是原生的AMQP来完成的,没有与SpringBoot进行整合。

引入依赖,版本根据自己的需要选择即可:

<dependencies>
    <!--rabbitmq 依赖客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!--操作文件流的一个依赖-->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

3.1 快速开始,手写一个RabbitMQ的生产者和消费者

3.1.1 生产者

@Slf4j
public class QuickStartProducer {

    public static final String EXCHANGE_NAME  = "quickStart_exchange";

    public static void main(String[] args) throws Exception {

        // 1.创建一个ConnectionFactory,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("你的RabbitMQ服务器IP地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("你的RabbitMQ服务器账号");
        connectionFactory.setPassword("你的RabbitMQ服务器密码");

        // 2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        // 4.创建交换器
        // 因为不知道生产者和消费者程序哪个先启动,所以一般的做法是在生产者和消费者2边都创建交换器(有的话不会重复创建)
        channel.exchangeDeclare(QuickStartProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String routingKey = "error";
        // 5.通过channel发送数据
        for (int i = 0; i < 5; i++) {
            String message = "hello rabbitmq " + i;
            //真正发送消息的API-basicPublish
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }

        // 6.记得要关闭相关的连接
        channel.close();
        connection.close();
    }
}

步骤以及几个注意点:

  1. 创建一个ConnectionFactory,并进行配置,配置RabbitMQ服务器地址、端口、虚拟主机、账号以及密码,其中端口如果是默认的话就是5672,不能够使用15672,15672是RabbitMQ的web管理端的端口,真正提供服务的是5672这个端口。
  2. 通过连接工厂创建连接
  3. 通过connection创建一个channel(信道)
  4. 通过channel创建交换器,因为不知道生产者和消费者程序哪个先启动,所以一般的做法是在生产者和消费者两边都创建交换器(有的话不会重复创建)
  5. 通过channel发送数据

相关API解释:

  • channel.exchangeDeclare()

    这是创建交换器的API,直接给出参数最多的一个进行解释:

    /**
     *
     * @param exchange 交换器名称
     * @param type 交换器类型
     * @param durable 是否持久化
     * @param autoDelete 是否自动删除
     * @param internal 是否内置
     * @param arguments 其他结构化参数
     * @return
     * @throws IOException
     */
    Exchange.DeclareOk exchangeDeclare(String exchange,
            BuiltinExchangeType type,
            boolean durable,
            boolean autoDelete,
            boolean internal,
            Map<String, Object> arguments) throws IOException;
    

    具体的详细解释可以查看这一小结的内容:交换机参数详细解释

  • channel.basicPublish()

    这是生产者发送消息的API,同样的,给出参数最多的一个进行解释:

    /**
     *
     * @param exchange 交换机名称
     * @param routingKey 路由键
     * @param mandatory 消息无法被路由到队列时是否返回给生产者,
     *                  为true则无法路由时返回给生产者,
     *                  为false则会被丢弃
     * @param immediate 消息无法被路由到队列时是否立即返回给生产者,
     *                  为true则无法路由时返回给生产者,
     *                  为false则会被放回队列中
     * @param props 一个 BasicProperties 对象,用于指定消息的基本属性,如消息的持久性、优先级、过期时间等。
     * @param body 一个字节数组,表示消息的内容
     * @throws IOException
     */
    void basicPublish(String exchange, 
                      String routingKey, 
                      boolean mandatory, 
                      boolean immediate, 
                      BasicProperties props, 
                      byte[] body) throws IOException;
    

注意:

上述生产者用的交换机类型是Direct,所以,消费者那边定义交换器的类型也要一样,因为不知道到底是先启动生产者还是消费者,所以一般的做法是两者都要创建交换器,如果存在了,则不会重复创建。

因为使用的是Direct类型的交换器,所以生产者的RoutingKey必须和消费者的BindingKey完全匹配才可接收到数据。

3.1.2 消费者

@Slf4j
public class QuickStartConsumer {

    public static void main(String[] args) throws Exception {

        // 1.创建一个ConnectionFactory,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("你的RabbitMQ服务器IP地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("你的RabbitMQ服务器账号");
        connectionFactory.setPassword("你的RabbitMQ服务器密码");

        // 2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过connection创建一个channel
        Channel channel = connection.createChannel();

        String queueName = "quickStartErrorQueue";
        String bindingKey = "error";
        // 4.创建交换器
        // 因为不知道生产者和消费者程序哪个先启动,所以一般的做法是在生产者和消费者2边都创建交换器(有的话不会重复创建)
        channel.exchangeDeclare(QuickStartProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /**
         * 5.声明(创建)一个队列
         * queue: 队列名字
         * durable: 是否持久化
         * exclusive: 是否独占
         * autoDelete: 队列脱离exchange,自动删除
         * arguments: 扩展参数
         */
        channel.queueDeclare(queueName, true, false, false ,null);
        // 6.绑定交换机和队列
        channel.queueBind(queueName, QuickStartProducer.EXCHANGE_NAME, bindingKey);

        // 7.创建消费者
        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        // 8.消费者开始消费数据
        channel.basicConsume(queueName , true, consumer);
    }
}

步骤以及几个注意点:

  1. 创建一个ConnectionFactory,并进行配置。注意点参考3.1.1小节的生产者。
  2. 通过连接工厂创建连接
  3. 通过connection创建一个channel(信道)
  4. 通过channel创建交换器
  5. 通过channel创建队列
  6. 通过channel绑定队列和交换机
  7. 通过channel消费数据

相关API解释:

  • channel.queueDeclare()

    这是创建队列的API,给出参数最多的一个进行解释:

    /**
     * 声明(创建)一个队列
     * queue: 队列名字
     * durable: 是否持久化
     * exclusive: 是否独占
     * autoDelete: 队列脱离exchange,自动删除
     * arguments: 扩展参数
     */
    Queue.DeclareOk queueDeclare(String queue, 
                                 boolean durable, 
                                 boolean exclusive, 
                                 boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
    

    具体的详细解释可以查看这一小结的内容:队列参数详细解释

  • channel.queueBind()

    这是绑定队列和交换机的API,绑定了之后,交换机才知道将消息发送给哪些队列,给出参数最多的一个进行解释:

    /**
     * 绑定队列到交换机上
     * @param queue 队列名称
     * @param exchange 交换机名称
     * @param routingKey 路由键
     * @param arguments 其他额外的参数,这些参数可以用于控制绑定的行为,
     *                  如设置绑定的参数、定义绑定的行为等。
     *                  常见的参数包括 x-match(匹配模式)和 alternate-exchange(备用交换机)
     * @return
     * @throws IOException
     */
    Queue.BindOk queueBind(String queue, 
                           String exchange, 
                           String routingKey, 
                           Map<String, Object> arguments) throws IOException;
    
  • channel.basicConsume()

    这是消费消息的API,给出参数最多的一个进行解释:

    /**
     *
     * @param queue 队列的名称
     * @param autoAck 否启用自动确认模式
     *                如果设置为 true,则表示消息在被接收后会立即被确认;
     *                如果设置为 false,则需要手动调用 Channel.basicAck 方法来确认消息
     * @param consumerTag 标识消费者的唯一标识符。当消费者连接到 RabbitMQ 服务器时,会分配一个消费者标签
     * @param noLocal 是否禁止消费者接收自己发送的消息。如果设置为 true,则表示消费者不会接收自己发送的消息
     * @param exclusive 是否将队列声明为排他队列。如果设置为 true,则表示只有当前连接的消费者可以访问该队列
     * @param arguments 一个包含其他消费者参数的 Map 对象。这些参数可以用于控制消费者的行为,
     *                  如设置消费者的优先级、最大消费者数量等
     * @param deliverCallback 一个 DeliverCallback 对象,用于处理从队列中接收到的消息。
     *                        当消费者接收到消息时,将会调用此回调函数来处理消息
     * @param cancelCallback 一个 CancelCallback 对象,用于处理消费者被取消订阅时的情况。
     *                       当消费者取消订阅队列时,将会调用此回调函数。
     * @param shutdownSignalCallback 一个 ConsumerShutdownSignalCallback 对象,用于处理消费者关闭信号的情况。
     *                               当消费者关闭时,将会调用此回调函数
     * @return
     * @throws IOException
     */
    String basicConsume(String queue, 
                        boolean autoAck, 
                        String consumerTag, 
                        boolean noLocal, 
                        boolean exclusive, 
                        Map<String, Object> arguments, 
                        DeliverCallback deliverCallback, 
                        CancelCallback cancelCallback, 
                        ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;
    

注意:

无论是交换器还是队列,都可以在生产者或者消费者方进行创建(exchange已经有的话不会重复创建,queue也是),但是再我例子包括后续的例子中,一定要先启动consumer,后启动producer,因为我只在consumer中声明了exchangequeue的绑定关系,如果先启动producer,因为exchange找不到相应的queue,所以消息会丢失。


执行结果:

生产者发送消息:

2024-03-27 14:35:13.074 [main] INFO  c.j.r.c.QuickStartProducer - send message, routingKey: error, message: hello rabbitmq 0
2024-03-27 14:35:13.079 [main] INFO  c.j.r.c.QuickStartProducer - send message, routingKey: error, message: hello rabbitmq 1
2024-03-27 14:35:13.079 [main] INFO  c.j.r.c.QuickStartProducer - send message, routingKey: error, message: hello rabbitmq 2
2024-03-27 14:35:13.079 [main] INFO  c.j.r.c.QuickStartProducer - send message, routingKey: error, message: hello rabbitmq 3
2024-03-27 14:35:13.079 [main] INFO  c.j.r.c.QuickStartProducer - send message, routingKey: error, message: hello rabbitmq 4

消费者消费消息:

2024-03-27 14:35:13.119 [pool-1-thread-4] INFO  c.j.r.c.QuickStartConsumer - get message, routingKey: error, message: hello rabbitmq 0
2024-03-27 14:35:13.122 [pool-1-thread-5] INFO  c.j.r.c.QuickStartConsumer - get message, routingKey: error, message: hello rabbitmq 1
2024-03-27 14:35:13.122 [pool-1-thread-5] INFO  c.j.r.c.QuickStartConsumer - get message, routingKey: error, message: hello rabbitmq 2
2024-03-27 14:35:13.122 [pool-1-thread-5] INFO  c.j.r.c.QuickStartConsumer - get message, routingKey: error, message: hello rabbitmq 3
2024-03-27 14:35:13.122 [pool-1-thread-5] INFO  c.j.r.c.QuickStartConsumer - get message, routingKey: error, message: hello rabbitmq 4

3.2 演示各种exchange的使用

来回顾一下上面说的各种exchange机器路由规则

交换器类型 路由规则
fanout 发送到该交换机的消息都会路由到与该交换机绑定的所有队列上,可以用来做广播
direct 把消息路由到BindingKeyRoutingKey完全匹配的队列中
topic topic和direct类似,也是将消息发送到RoutingKeyBindingKey相匹配的队列中,只不过可以模糊匹配
headers 性能差,基本不会使用

3.2.1 Fanout

生产者:

@Slf4j
public class FanoutExchangeProducer {

    public static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(FanoutExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String[] logLevel = {"info", "warning", "error"};
        for (int i = 0; i < 3; i++) {
            String routingKey = logLevel[i % 3];
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
        //省略关闭连接和信道的代码
    }
}

消费者A:

@Slf4j
public class FanoutExchangeConsumerA {

    public static void main(String[] args) throws Exception {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        // 声明一个交换机
        channel.exchangeDeclare(FanoutExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String queueName = "allQueueA";
        String bindingKey = "info";

        channel.queueDeclare(queueName, false, false, false ,null);
        channel.queueBind(queueName, FanoutExchangeProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName , true, consumer);

    }
}

消费者B:

@Slf4j
public class FanoutExchangeConsumerB {

    public static void main(String[] args) throws Exception {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        // 声明一个交换机
        channel.exchangeDeclare(FanoutExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String queueName = "allQueueB";
        String bindingKey = "info";

        channel.queueDeclare(queueName, false, false, false ,null);
        channel.queueBind(queueName, FanoutExchangeProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName , true, consumer);

    }
}

生产者消息发送:

生产者消息发送

消费者A消息接收:

消费者A消息接收

消费者B消息接收:

消费者B消息接收

可见所有的消费者都收到了消息,无论你的BingdingKey和RountingKey为什么,证明了其为广播。

3.2.2 Direct

生产者:

@Slf4j
public class DirectExchangeProducer {

    public static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {

        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(DirectExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String[] logLevel = {"info", "warning", "error"};
        for (int i = 0; i < 3; i++) {
            String routingKey = logLevel[i % 3];
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
        //省略关闭连接和信道的代码
    }
}

消费者:

@Slf4j
public class DirectExchangeConsumer {

    public static void main(String[] args) throws Exception {

        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(DirectExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "directExchangeErrorQueue";
        String bindingKey = "error";
        
        channel.queueDeclare(queueName, false, false, false ,null);
        channel.queueBind(queueName, DirectExchangeProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName , true, consumer);

    }
}

生产者发送消息:

生产者发送消息

消费者接收消息:

消费者接收消息

可见在Direct类型的交换器下,只有RoutingKey和BindingKey完全匹配的时候,消费者才会收到消息。

3.2.3 Topic

生产者:

@Slf4j
public class TopicExchangeProducer {

    public static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {

        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        String[] logLevel = {"info", "warning", "error"};
        String[] module = {"driver", "login", "crm"};
        String[] score = {"A" , "B", "C"};
        for (int i = 0; i < 3; i++) {
            for (int j = 0; j < 3; j++) {
                for (int k = 0; k < 3; k++) {
                    String routingKey = String.join(".", Arrays.asList(logLevel[i % 3], module[j % 3], score[k % 3]));
                    String message = "hello rabbitmq routingKey is " + routingKey;
                    channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                    log.info("send message, routingKey: {}, message: {}", routingKey, message);
                }
            }
        }
        //省略关闭连接和信道的代码
    }
}

消费者1:

@Slf4j
public class FocusCrmConsumer {

    public static void main(String[] args) throws Exception {

        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        // 声明一个交换机
        channel.exchangeDeclare(TopicExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = "crmQueue";
        String bindingKey = "#.crm.#";

        channel.queueDeclare(queueName, false, false, false ,null);
        channel.queueBind(queueName, TopicExchangeProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName , true, consumer);
    }
}

消费者2:

@Slf4j
public class FocusAllConsumer {

    public static void main(String[] args) throws Exception {

        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        // 声明一个交换机
        channel.exchangeDeclare(TopicExchangeProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = "allQueue";
        String bindingKey = "#";

        channel.queueDeclare(queueName, false, false, false ,null);
        channel.queueBind(queueName, TopicExchangeProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName , true, consumer);
    }
}

执行结果:

生产者发送消息:

生产者发送消息

消费者1接收消息:

消费者1接收消息

消费者2接收消息:

消费者2接收消息

3.3 消息获取的两种方式

消息的获得方式有2种

  1. 拉取消息(get message)

  2. 推送消息(consume message)

那我们应该拉取消息还是推送消息?get是一个轮询模型,而consumer是一个推送模型。get模型会导致每条消息都会产生与RabbitMQ同步通信的开销,这一个请求由发送请求帧的客户端应用程序和发送应答的RabbitMQ组成。

详细的来说:

  1. 基于推送(Push-based):
    • 在基于推送的消息传递模型中,消费者注册一个回调函数,并告诉 RabbitMQ 在消息到达时调用该函数。
    • RabbitMQ 将消息推送到消费者,消费者无需轮询来检查是否有新消息。
    • 使用 AMQP 协议中的 basicConsume 方法来启动基于推送的消费者。在该方法中,你需要指定一个回调函数来处理每条消息的到达。
    • 这种方式的好处在于实时性高,能够立即处理消息,但需要消费者一直保持连接以便接收消息
  2. 基于拉取(Pull-based):
    • 基于拉取的消息传递模型中,消费者主动发起请求以获取消息,而不是等待消息被推送。
    • RabbitMQ 不会推送消息到消费者,而是等待消费者请求消息。
    • 消费者使用 basicGet 方法来从队列中获取消息。这个方法将立即返回队列中的下一条消息,或者返回空(如果队列为空)。
    • 这种方式的好处在于可以更好地控制消费者的行为,消费者可以按需获取消息,但需要消费者不断地轮询以检查是否有新消息

在选择消息获取方式时,需要考虑系统的实时性需求、消费者的可用性、网络延迟等因素。通常情况下,基于推送的方式更适合实时性要求高的场景,而基于拉取的方式更适合需要控制消费者行为的场景。

注意:

这里无论是推送还是拉取,都是针对消费者来说的,如果是推送,那么消费者侧就使用basicConsume,如果是拉取,则使用basicGet

在之前的所有代码,基本上都是推送的方式来调用的, 这里我们主要演示一下拉取的方式。

生产者:

@Slf4j
public class GetMsgProducer {

    public static final String EXCHANGE_NAME = "getMessage_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String routingKey = "error";
        for (int i = 0; i < 3; i++) {
            String message = "hello rabbitmq" + (i + 1);
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
        //省略关闭连接和信道的代码
    }
}

消费者:

@Slf4j
public class GetMsgConsumer {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(GetMsgProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, GetMsgProducer.EXCHANGE_NAME, "error");

        /**
         * basicGet和basicConsume的区别:
         * 1. 阻塞 vs. 非阻塞:
         *      basicGet是一个阻塞调用。它会立即返回一个消息(如果队列中有消息),或者返回null(如果队列为空)。
         *      因此,如果队列中没有消息,basicGet会立即返回。
         *      basicConsume是一个非阻塞调用。它会一直监听队列,直到有消息到达。
         *      因此,如果队列为空,basicConsume会一直等待直到有新消息到达为止。
         * 2. 主动 vs. 被动:
         *      basicGet是主动的,即你需要反复调用它来检查是否有新消息。
         *      basicConsume是被动的,一旦调用它,它会持续监听队列,当有新消息到达时触发你注册的回调函数。
         * 3. 用途:
         *      basicGet适用于一次性获取队列中的一条消息,然后进行处理。它通常用于轮询方式的消息获取。
         *      basicConsume适用于持续监听队列,实时地处理消息。它通常用于消息驱动的应用,能够在消息到达时立即做出响应。
         */
        while(true) {
            GetResponse getResponse = channel.basicGet(queueName, true);
            if (null != getResponse) {
                log.info("get message, routingKey: {}, message: {}", getResponse.getEnvelope().getRoutingKey(), new String(getResponse.getBody()));
            }
            //睡眠1s,防止cpu占用100%
            Thread.sleep(1000);
        }
    }
}

生产者发送消息:

生产者发送消息

消费者接收消息:

消费者接收消息

3.4 消息确认-ACK

消息的确认方式有2种

  1. 自动确认(autoAck=true)
  2. 手动确认(autoAck=false)

消费者在消费消息的时候,可以指定autoAck参数

String basicConsume(String queue, boolean autoAck, Consumer callback)

autoAck=false:RabbitMQ会等待消费者显示回复确认消息后才从内存(或者磁盘)中移出消息

autoAck=true: RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正的消费了这些消息

手动确认的方法如下,有2个参数

basicAck(long deliveryTag, boolean multiple)

deliveryTag:用来标识信道中投递的消息。RabbitMQ 推送消息给Consumer时,会附带一个deliveryTag,以便Consumer可以在消息确认时告诉RabbitMQ到底是哪条消息被确认了。
RabbitMQ保证在每个信道中,每条消息的deliveryTag1开始递增

multiple=true:消息id<=deliveryTag的消息,都会被确认

myltiple=false:消息id=deliveryTag的消息,都会被确认

一般要批量确认的时候,将multiple设置为true

消息一直不确认会发生啥?

如果队列中的消息发送到消费者后,消费者不对消息进行确认,那么消息会一直留在队列中,直到确认才会删除。

如果发送到A消费者的消息一直不确认,只有等到A消费者与rabbitmq的连接中断,rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者。

我现在有如下代码:

生产者:

@Slf4j
public class AutoAckFalseProducer {
    public static final String EXCHANGE_NAME = "ackFalse_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /**
         * 开启发布确认,保证生产者发送到Broker中的消息成功
         */
        //开启消息发布确认
        channel.confirmSelect();
        //设置确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 当消息被成功投递到交换机并被所有队列成功接收时,会触发handleAck方法。
             *  multiple = true :消息id<=deliveryTag的消息,都会被确认
             *  multiple=false: 消息id=deliveryTag的消息,会被确认
             * @param deliveryTag 是消息的唯一标识
             * @param multiple 是否确认多个消息;
             * @throws IOException
             */
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                log.info("handleAck, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
            }

            /**
             * 当消息投递到交换机,但由于某些原因未能被所有队列成功接收时,会触发handleNack方法。
             * @param deliveryTag 是消息的唯一标识
             * @param multiple 是否拒绝多个消息
             * @throws IOException
             */
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                //一般来说如果Broker接受失败,那么生产者应该重新发送消息
                log.info("handleNack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
            }
        });

        String routingKey = "error";
        for (int i = 0; i < 100; i++) {
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey ,message);
        }

    }
}

相关API解释:

  • channel.confirmSelect()

    这是用于在信道上启用确认模式。启用确认模式后,RabbitMQ 服务器将发送确认消息给生产者,以通知它们消息是否成功被投递到了交换机上。

    注意事项

    1. 确认模式的启用是针对单个信道的。如果应用程序使用多个信道,则需要在每个信道上分别调用 channel.confirmSelect() 方法。
    2. 确认模式通常会增加网络开销,因为 RabbitMQ 需要发送确认消息给生产者。因此,在高吞吐量的场景下,可能需要权衡考虑确认模式的开启。
  • channel.addConfirmListener()

    这是用于注册一个确认监听器,以便在生产者发布消息后,接收 RabbitMQ 服务器发送的确认消息。当消息被确认时,确认监听器将被调用,生产者可以在确认监听器中执行相应的操作,例如记录日志、重新发送消息、更新状态等。

    这里直接给出最多参数的一个做解释:

    ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
    

    这里有最多有两个参数:ackCallbacknackCallback;虽然参数有两个,但是确实同一个类:

    @FunctionalInterface
    public interface ConfirmCallback {
        void handle(long deliveryTag, boolean multiple) throws IOException;
    }
    

    可见这是一个函数式接口,有一个handle方法,只是我的生产者代码中使用的是这个方法:

    void addConfirmListener(ConfirmListener listener);
    

    ConfirmListener类如下:

    public interface ConfirmListener {
        void handleAck(long deliveryTag, boolean multiple)
            throws IOException;
    
        void handleNack(long deliveryTag, boolean multiple)
            throws IOException;
    }
    

    可见是一个接口,有handleAckhandleNack,分别对应成功投递到交换器并被所有队列接收时触发的回调以及未能被所有队列成功接收时触发的回调。

    因此,你可以使用一个参数的这个方法,然后实现ConfirmListener接口即可,或者使用两个参数的这个方法,写两个ConfirmCallback类型的lambda表达式回调即可。

消费者A:

@Slf4j
public class AutoAckFalseConsumerA {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(AutoAckFalseProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, AutoAckFalseProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };
        /**
         * 注意:这里接受消息的时候也将消息确认机制改为了手动确认,但是handleDelivery中并没有进行手动确认,
         * 所以消息处理之后不会被Broker删除
         * 消息一直不确认会发生啥?
         * 如果队列中的消息发送到消费者后,消费者不对消息进行确认,那么消息会一直留在队列中,直到确认才会删除。
         * 如果发送到A消费者的消息一直不确认,只有等到A消费者与rabbitmq的连接中断,
         * rabbitmq才会考虑将A消费者未确认的消息重新投递给另一个消费者,
         * 所以,当我们同时启动消费者A、B,然后再启动生产者,此时A、B消费消息,其中B消费了消息就将Broker中的消息删除了,
         * 但是A没有,
         * 所以A断开链接的时候,B就会消费掉Broker中剩下的未被删除的消息
         */
        channel.basicConsume(queueName, false, consumer);
    }
}

注意:

消费者A已经取消了自动确认,对应API:channel.basicConsume(queueName, false, consumer);;第二个参数就是关闭自动确认。

在关闭自动确认之后,消费者需要手动的调用channel.basicAck方法来显示的确认消息,如果没有,那么就算消费者消费消息,消息也不会从队列中删除。

消费者B:

@Slf4j
public class AutoAckFalseConsumerB {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(AutoAckFalseProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, AutoAckFalseProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    // 当我们正常消息,手动ack后,消息就会从mq中删除
                    // multiple为false表示一条一条确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } catch (Exception e) {
                    // 发生异常,发送nack,根据requeue参数来决定是将消息丢弃开始还是再重新放回队列
                    channel.basicNack(envelope.getDeliveryTag(), false, false);
                }
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        /**
         * 注意:这里接受消息的时候也将消息确认机制改为了手动确认,但是handleDelivery中并没有进行手动确认,
         * 所以消息处理之后不会被Broker删除
         */
        channel.basicConsume(queueName, false, consumer);
    }
}

现在我们通过如下步骤来演示:

  1. 开启 AutoAckFalseConsumerA, AutoAckFalseConsumerB, AutoAckFalseProducer
  2. 此时AutoAckFalseConsumerA AutoAckFalseConsumerB 都会收到消息,但是AutoAckFalseConsumerA没有对消息进行确认。
  3. 关闭AutoAckFalseConsumerA,此时可以看到AutoAckFalseConsumerB又收到AutoAckFalseConsumerA收到的消息

执行结果:

生产者发送消息:

生产者发送消息

可见,当消息成功发送到交换器上并且所有队列成功接收的时候,就会触发handleAck的回调,这个回调是RabbitMQ服务器自己帮你调用的,并且如果消息是一条一条被服务器确认的,那么multiple就是false,否则就是true。

消费者A消费消息:

消费者A消费消息

可见消费者A消费的消息都是单数,但是消费者A的代码中消费了消息并没有进行消费确认,所以尽管这里消费了消息,但是消息任然是存放在队列中的。

消费者B消费消息:

消费者B消费消息

可见消费者B消费的消息为双数,并且消费者B的代码中消费了消息是进行确认了的,所以消息一被消费者确认,那么就会将该消息从队列中删除。

现在我们停掉消费者A,观察消费者B:

停掉消费者A,观察消费者B

可见消费者A消费了消息一直不确认,所以,只有当消费者A与服务器断开连接的时候,才会将消费者A未确认的消息投递给另外一个消费者,也就是消费者B。

3.5 拒绝消息的两种方式

确认消息只有一种方法

  1. basicAck(long deliveryTag, boolean multiple)

而拒绝消息有两种方式

  1. basicNack(long deliveryTag, boolean multiple, boolean requeue)

  2. basicReject(long deliveryTag, boolean requeue)

basicNackbasicReject的区别只有一个,basicNack支持批量拒绝

deliveryTagmultiple参数前面已经说过。

requeue=true:消息会重新投放给任意一个消费者(包括拒绝消息的那个消费者)

requeue=false:消息会被直接丢失

生产者:

/**
 * 一般情况下,如果队列中的消息发送到消费者后,消费者不对消息进行确认。
 * 那么消息会一直留在队列中,直到确认才会删除。
 * 消费者与rabbitmq的连接中断,rabbitmq才会考虑将消息重新投递给另一个消费者
 */
@Slf4j
public class RejectMsgProducer {

    public static final String EXCHANGE_NAME = "rejectMsg_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String routingKey = "error";
        for (int i = 0; i < 10; i++) {
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey ,message);
        }
        //省略关闭连接和信道的代码
    }
}

要拒绝消息的消费者:

/**
 * 消息拒绝有如下两种api
 * 1. channel.basicReject(envelope.getDeliveryTag(), false);
 * 2. channel.basicNack(envelope.getDeliveryTag(), false, false);
 * 这2种方式的区别在于basicNack有一个批量拒绝的功能
 * requeue为true时,消息会重新投放给任意一个消费者(包括拒绝消息的那个消费者)
 */
@Slf4j
public class RejectMsgConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(RejectMsgProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, RejectMsgProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    throw new RuntimeException("消息消费异常");
                } catch (Exception  e) {
                    channel.basicReject(envelope.getDeliveryTag(), true);
//                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                }
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        channel.basicConsume(queueName, false, consumer);
    }
}

正常消费的消费者:

@Slf4j
public class NormalConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(RejectMsgProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, RejectMsgProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                channel.basicAck(envelope.getDeliveryTag(), false);
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        channel.basicConsume(queueName, false, consumer);
    }
}

执行结果:

生产者发送消息:

生产者发送消息

拒绝消息的消费者:

拒绝消息的消费者

正常消费的消费者:

正常消费的消费者

可见,拒绝消费的消费者消费的是单数消息,并且,出现了3、7、3、3的情况,这是因为拒绝消息之后,消息重新入队之后投递,还有可能重新投递到拒绝的消费者哪里去,所以你会见到3、7、3、3的情况。

对于正常消费的消费者来说,除了消费本身自己队列中的双数消息,由于拒绝消费者拒绝消息,所以也会消费拒绝消费者重新投递到正常消费者队列中的消息。

3.6 失败通知

3.6到3.10小节主要简述了消息发布时的权衡

我们最常用的就是失败通知和发布者确认

当消息不能被路由到某个queue时,我们如何获取到不能正确路由的消息呢?

  1. 在发送消息时设置mandatorytrue
  2. 生产者可以通过调用channel.addReturnListener来添加ReturnListener监听器获取没有被路由到队列中的消息。

mandatorychannel.basicPublish()方法中的参数,之前讲解basicPublish这个API的时候说过。

mandatory=true:交换器无法根据路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者

mandatory=false:出现上述情形,则消息直接被丢弃

生产者:

/**
 * 失败通知
 * 当消息不可路由时,我们如何获取到不能正确路由的消息呢?
 * 1. channel.basicPublish()方法的mandatory参数设置为true
 * 2. channel.addReturnListener()添加监听器
 */
@Slf4j
public class FailureNoticeProducer {

    public static final String EXCHANGE_NAME  = "failureNotice_exchange";

    public static void main(String[] args) throws Exception {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(FailureNoticeProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, 
                                     String replyText, 
                                     String exchange, 
                                     String routingKey, 
                                     AMQP.BasicProperties properties, 
                                     byte[] body) throws IOException {
                log.info("send message error, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
                        replyCode, replyText, exchange, routingKey);
            }
        });

        /**
         * 因为消费者那边的bindingKey为error,所以当生产者发送的消息的routingKey为info的时候,就会导致路由失败
         */
        String[] logLevel = {"error","info"};
        for (int i = 0; i < 5; i++) {
            String routingKey = logLevel[i % 2];
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey,  true, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
    }
}

相关API解释:

  • channel.addReturnListener()

    这个API允许你在消息发送失败时捕获这些返回的消息。这种情况通常发生在消息无法路由到指定的队列时,或者发送的消息无法被消费者正确处理时,你可以通过该监听器来处理这些返回的消息,例如记录日志、重新发送消息或者执行其他的错误处理逻辑。

    这里直接给出最多参数的一个做解释:

    void addReturnListener(ReturnListener listener);
    

    ReturnListener对应的源码:

    public interface ReturnListener {
        void handleReturn(int replyCode,
                String replyText,
                String exchange,
                String routingKey,
                AMQP.BasicProperties properties,
                byte[] body)
            throws IOException;
    }
    

    以及

    ReturnListener addReturnListener(ReturnCallback returnCallback);
    

    ReturnCallback对应的源码:

    @FunctionalInterface
    public interface ReturnCallback {
    
        void handle(Return returnMessage);
    
    }
    

    Return对应的部分源码:

    Return对应的部分源码

    所以可见,二者其实是一个道理,可以直接改为lambda表达式:

     channel.addReturnListener((replyCode,replyText, exchange, routingKey, properties,body)->
            log.info("send message error, replyCode: {}, replyText: {}, exchange: {}, routingKey: {},body: {}",
                    replyCode, replyText, exchange, routingKey,new String(body,"UTF-8"))
    );
    
    /*channel.addReturnListener((returnMessage)->
            log.info("send message error, returnMessage: {}", returnMessage)
    );*/
    

消费者:

@Slf4j
public class FailureNoticeConsumer {

    public static void main(String[] args) throws Exception {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        channel.exchangeDeclare(FailureNoticeProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "failureNoticeQueue";
        String bindingKey = "error";
        channel.queueDeclare(queueName, false, false, false ,null);
        channel.queueBind(queueName, FailureNoticeProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        channel.basicConsume(queueName , true, consumer);
    }
}

执行结果:

生产者发送消息:

生产者发送消息

消费者接收消息:

消费者接收消息

因为消费者的bindingKey为error,如果生产者发送消息时的routingKey不为error的时候,就会导致路由失败,如果生产者处监听了不能被正确路由的消息,那么生产者就能够对路由失败的消息进行再次处理。

3.7 发布者确认

注意:本小节是发布者确认,不是消费者确认,消费者确认在之前3.4小节(消息确认-ACK)已经讲过了。

当消息被发送后,消息到底有没有到达Exchange呢?默认情况下生产者是不知道消息有没有到达Exchange

RabbitMQ针对这个问题,提供了两种解决方式

  1. 事务(后面会讲到)
  2. 发布者确认(publisher confirm)

而发布者确认有三种编程方式

  1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
  2. 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm
  3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

异步confirm模式的性能最高,因此经常使用,我想把这个分享的细一下:

channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        log.info("handleAck, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        log.info("handleNack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
    }
});

写过异步confirm代码的小伙伴应该对这段代码不陌生,可以看到这里也有deliveryTagmultiple。但是我要说的是这里的deliveryTagmultiple消息的ACK没有一点关系,具体区别:

  • ConfirmListener中的ACK:是由RabbitMQ控制的,用来确认消息是否到达Exchange

  • 消息的ACK:上面说到可以自动确认,也可以手动确认,用来确认Queue中的消息是否被Consumer消费

3.7.1 普通确认模式

生产者:

@Slf4j
public class ConfirmProducer {

    public static final String EXCHANGE_NAME = "confirm_exchange";

    public static void main(String[] args)
            throws IOException, TimeoutException, InterruptedException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 启用发布者确认模式
        channel.confirmSelect();

        String routingKey = "error";
        for (int i = 0; i < 10; i++) {
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            // 一条一条确认,返回为true,则表示发送成功
            /**
             * waitForConfirms和addConfirmListener的区别:
             *  channel.addConfirmListener 方法(异步)用于向通道添加一个确认监听器(ConfirmListener)。
             *  可以同时监听多个消息的确认状态,提供了更灵活的处理方式。
             *  
             *  channel.waitForConfirms() 方法是一个同步方法,它会阻塞当前线程,直到所有待确认的消息都被确认。
             *  通常使用 waitForConfirms() 方法在发布多个消息后,等待所有消息都被确认,然后再继续执行后续的代码。
             *  
             *  总体而言,使用 channel.addConfirmListener 允许你异步处理每个消息的确认状态,
             *  而 channel.waitForConfirms() 是一个同步方法,它提供了一种等待所有消息确认完成的机制。
             */
            if (channel.waitForConfirms()) {
                log.info("send success, routingKey: {}, message: {}", routingKey ,message);
            } else {
                log.info("send fail, routingKey: {}, message: {}", routingKey ,message);
            }
        }
        //省略关闭连接和信道的代码
    }
}

注意看我代码中提到的waitForConfirmsaddConfirmListener的区别:

  1. channel.addConfirmListener 方法(异步)用于向通道添加一个确认监听器(ConfirmListener),可以同时监听多个消息的确认状态,提供了更灵活的处理方式。
  2. channel.waitForConfirms() 方法是一个同步方法,它会阻塞当前线程,直到所有待确认的消息都被确认。通常使用 waitForConfirms() 方法在发布多个消息后,等待所有消息都被确认,然后再继续执行后续的代码。

总体而言,使用 channel.addConfirmListener 允许你异步处理每个消息的确认状态,而 channel.waitForConfirms() 是一个同步方法,它提供了一种等待所有消息确认完成的机制。

消费者:

@Slf4j
public class ConfirmConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(ConfirmProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, ConfirmProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

执行结果:

生产者发送消息:

生产者发送消息

消费者接收消息:

消费者接收消息

注意观察消费者接收消息和生产者发送消息的时间,可见基本上是RabbiMQ服务器确认收到了消息之后(也就是消息成功到达了交换器-Exchange),消费者立马就消费了,在消费者消费消息的同时,由于服务器已经确认接收到了消息,那么生产者才会继续发送消息。

3.7.2 批量确认模式

生产者:

@Slf4j
public class BatchConfirmProducer {

    public static final String EXCHANGE_NAME = "batch_confirm_exchange";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 启用发布者确认模式
        channel.confirmSelect();

        String routingKey = "error";
        for (int i = 0; i < 10; i++) {
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            if ((i & 1) == 1) {
                // 发送的2条消息都确认了才会继续发送
                if (channel.waitForConfirms()) {
                    log.info("send success");
                } else {
                    log.info("send fail");
                }
            }
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
        //省略关闭连接和信道的代码
    }
}

消费者:

@Slf4j
public class BatchConfirmConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
           //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(BatchConfirmProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, BatchConfirmProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

执行结果:

生产者发送消息:

生产者发送消息

消费者接收消息:

消费者接收消息

这里的道理和普通模式如如出一辙。

注意:并不是消费者的时间要和生产者的时间一致,消费者消费消息的时间只会大于等于生产者的时间的。

3.7.3 异步确认模式

生产者:

/**
 * Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),
 * 我们需要自己为每一个Channel维护一个UnConfirm的消息序号集合,
 * 每publish一条数据,集合中元素加1,每回调一次handleAck方法,
 * UnConfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。
 * 从程序运行效率上看,这个UnConfirm集合最好采用有序集合SortedSet存储结构
 *
 * 参考自《RabbitMQ实战指南》
 */
@Slf4j
public class AsyncConfirmProducer {

    public static final String EXCHANGE_NAME = "async_confirm_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 启用发布者确认模式
        channel.confirmSelect();

        //开启消息失败通知监听器
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                log.info("send message error, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
                        replyCode, replyText, exchange, routingKey);
            }
        });

        //开启确认监听器
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                log.info("handleAck, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                log.info("handleNack, deliveryTag: {}, multiple: {}", deliveryTag, multiple);
            }
        });

        String routingKey = "error";
        for (int i = 0; i < 10; i++) {
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, true, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }

    }
}

消费者:

@Slf4j
public class AsyncConfirmConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(AsyncConfirmProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, AsyncConfirmProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        //这里开启了消息的自动确认模式
        channel.basicConsume(queueName, true, consumer); 
    }
}

执行结果:

生产者发送消息:

生产者发送消息

消费者接收消息:

消费者接收消息

注意生产者处:

生产者处

这里的确认同样也是RabbitMQ服务器确认消息已经成功路由并且到达了Queue,并不是消费者消费消息的确认。

3.8 备用交换器

生产者在发送消息的时候如果不设置 mandatory 参数那么消息在未被路由到queue的情况下将会丢失,如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备用交换器,这样可以将未被路由到Queue的消息存储在RabbitMQ 中,在需要的时候去处理这些消息。

生产者:

@Slf4j
public class BackupExProducer {

    public static final String EXCHANGE_NAME = "main_exchange";
    public static final String BAK_EXCHANGE_NAME = "backup_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置

        // 备用交换器
        // Fanout Exchange
        // 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
        channel.exchangeDeclare(BAK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("alternate-exchange", BAK_EXCHANGE_NAME);

        /**
         * 注意,这个argsMap参数并不是给“main_exchange”设置为备用交换机,
         * 而是在当“main_exchange”交换机不可达到的时候,通过这个参数,查找到名称为:“alternate-exchange”的备用交换机“BAK_EXCHANGE_NAME”
         *
         * 这里是给direct类型的交换机发送消息,并将备用交换机绑定到了目标交换机main_exchange上面
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, argsMap);

        String[] logLevel ={"error","info","warning"};
        for (int i = 0; i < 3; i++) {
            String routingKey = logLevel[i % 3];
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
        //省略关闭连接和信道的代码
    }
}

生产者发送消息到main_exchange交换器上。

详细解释一下这里的代码:

channel.exchangeDeclare(BAK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

Map<String, Object> argsMap = new HashMap<>();
argsMap.put("alternate-exchange", BAK_EXCHANGE_NAME);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, argsMap);

首先,在生产者代码中,声明了一个备用交换器,这个备用交换器的类型是Fanout,之后声明一个正常的交换器,类型是direct,不过,要将备用交换器与正常的交换器进行绑定,这时候就要用到exchangeDeclare方法中,arguments参数了,它是一个Map,键必须为alternate-exchange,值为备用交换器的名称。这样,当正常交换器无法接收到的消息,就会被投递到备用交换器中了。

具体的web管理页面:

web管理页面操作步骤

正常消费者:

@Slf4j
public class MainConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(BackupExProducer.BAK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("alternate-exchange", BackupExProducer.BAK_EXCHANGE_NAME);

        //指定当前正常交换器的备用交换器是谁,使用在声明正常交换器的时候绑定argsMap,就可以为其绑定备用交换器了
        channel.exchangeDeclare(BackupExProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, argsMap);;

        String queueName = "errorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, BackupExProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

消费者接收main_exchange交换器发送过来的数据,bingKeyerror

备用消费者

@Slf4j
public class BackupExConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(BackupExProducer.BAK_EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        String queueName = "notErrorQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        // fanout类型的交换器bindingKey不起作用,这里随便写了一个#
        channel.queueBind(queueName, BackupExProducer.BAK_EXCHANGE_NAME, "#");
        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

无法被正常路由到正常交换器上的消息都会被发送到备用交换器上面被备用消费者处理。

执行结果:

生产者发送消息:

生产者发送消息

正常消费者消费消息:

正常消费者消费消息

备用消费者消费消息:

备用消费者消费消息

3.9 事务

RabbitMQ中与事务机制相关的方法有3个:

方法 解释
channel.txSelect() 将当前的信道设置成事务模式
channel.txCommit() 提交事务
channel.txRollback() 回滚事务

消息成功被发送到RabbitMQExchange上,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。

因为事务会榨干RabbitMQ的性能,所以一般使用发布者确认代替事务。

生产者:

/**
 * 事务效率比较低,生产环境中一般用发布方确认(publisher confirm)来代替事务
 */
@Slf4j
public class TransactionProducer {

    public static final String EXCHANGE_NAME = "transaction_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String routingKey = "error";
        // 将当前信道设置成事务模式
        channel.txSelect();
        
        for (int i = 0; i < 10; i++) {
            String message = "hello rabbitmq " + i;
            try {
                channel.basicPublish(EXCHANGE_NAME, routingKey, false, null, message.getBytes());
                log.info("send message, routingKey: {}, message: {}", routingKey, message);
                // 提交事务
                channel.txCommit();
            } catch (Exception e) {
                e.printStackTrace();
                // 回滚事务
                channel.txRollback();
            }
        }

        //省略关闭连接和信道的代码
    }
}

消费者:

@Slf4j
public class TransactionConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(TransactionProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "transactionQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        String bindingKey = "error";
        channel.queueBind(queueName, TransactionProducer.EXCHANGE_NAME, bindingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

这里很简单,就是一个API的使用,就不演示结果了,可执行赋值代码测试。

3.10 消息持久化

消息做持久化,只需要将消息属性的delivery-mode设置为2即可

RabbitMQ给我们封装了这个属性,即MessageProperties.PERSISTENT_TEXT_PLAIN

当我们想做消息的持久化时,最好同时设置队列和消息的持久化,因为只设置消息的持久化,重启之后消息会丢失。只设置队列的持久化,重启后队列消失,继而消息也丢失。

生产者:

/**
 * 做消息持久化的时候,队列也得做持久化,不然RabbitMQ重启后,队列消失,消息也会消失
 */
@Slf4j
public class MsgDurableProducer {

    public static final String EXCHANGE_NAME = "msg_durable_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String routingKey = "error";
        for (int i = 0; i < 10; i++) {
            String message = "hello rabbit " + i;
            // 主要就是将deliveryMode设置为2
            channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey, message);
        }
        //省略关闭连接和信道的代码
    }
}

这里面可以去看看:MessageProperties这个类,里面提供了一些常量,最好去看一下。

消费者:

@Slf4j
public class MsgDurableConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(MsgDurableProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "msgDurableQueue";
        /**
         * 这里的队列也被持久化了,因为最好将消息和队列同时持久化
         * 因为只设置队列的持久化,重启之后消息会丢失。只设置消息的持久化,重启后队列消失,继而消息也丢失
         */
        channel.queueDeclare(queueName, true, false, false, null);

        channel.queueBind(queueName, MsgDurableProducer.EXCHANGE_NAME, "error");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

执行之后,查看web控制台:

web控制台队列查看

可见features那一栏中为D,意思就是持久化的意思。

💡Tips:

为了证明是不是真的被持久化了,你可以重启你的RabbitMQ服务器,看这个队列是否存在。你也可以先发送消息,但是不消费,然后重启RabbitMQ服务器,之后再开启消费者服务,看看消息是否被丢失了。

现在我在停止服务器之前,已经发送了消息:

生产者发送消息

然后我重启了我的RabbitMQ服务器:

查看交换器

查看队列

可见,未被持久化的交换器和队列在服务重启之后都消失了。

现在我们开启消费者服务,看看消息是否丢失:

消费者接收消息

可见消息仍然还存在,服务重启并没有导致消息丢失。

3.11 死信队列

DLX,全称为Dead-Letter-Exchange,称之为死信交换器。当一个消息在队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列

DLX也是一个正常的交换器,和一般的交换器没有区别,实际上就是设置某个队列的属性。

这一点和之前的备用交换器类似。

消息变成死信一般是由于以下几种情况:

  1. 消息被拒绝(Basic.Reject/Basic.Nack)且不重新投递(requeue=false)
  2. 消息过期
  3. 队列达到最大长度

死信交换器和备用交换器的区别:

  • 备用交换器:

    1. 消息无法路由时转到备用交换器
    2. 备用交换器是在声明主交换器的时候定义的
  • 死信交换器:

    1. 消息已经到达队列,但是被消费者拒绝等的消息会转到死信交换器。
    2. 死信交换器是在声明队列的时候定义的

生产者:

/**
 * 当消息被拒绝,并且requeue=false时,最好将不能处理的消息投入死信队列供以后处理
 * 消息被路由到死信交换器的时候,可以重新设置路由键(如果不设置默认是消息原来的路由键)
 * 所以死信队列这块分了2个包,notResetRoutingKey 不重新设置路由键,resetRoutingKey 重新设置路由键
 */
@Slf4j
public class DlxProducer {

    public static final String EXCHANGE_NAME = "dlx_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        
        //定义正常的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String[] logLevel = {"error","info","warning"};
        for (int i = 0; i < 3; i++) {
            String routingKey = logLevel[i % 3];
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey ,message);
        }
        //省略关闭连接和信道的代码
    }
}

正常消费者:

@Slf4j
public class NormalConsumer {

    public static final String DLX_EXCHANGE_NAME = "accept_dlx_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        
        //定义死信交换机
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String queueName = "dlxNormalQueue";
        Map<String, Object> argsMap = new HashMap<>();
        argsMap.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        
        //声明正常队列,不过队列的参数要设置上死信交换机,与备份交换机不同(在交换机声明处设置参数)
        channel.queueDeclare(queueName, false, true, false, argsMap);

        channel.queueBind(queueName, DlxProducer.EXCHANGE_NAME, "#");

        Consumer consumer = new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                if (envelope.getRoutingKey().equals("error")) {
                    log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    //这里,当消费者拒绝消息,并且拒绝重新入队时,这条消息就称为了死信消息,会被投递到死信交换器上,然后到死信队列中
                    channel.basicReject(envelope.getDeliveryTag(), false);
                }
            }
        };

        channel.basicConsume(queueName, false, consumer);
    }
}

注意:

这里绑定死信交换器的时候,与备用交换器有所不同:

  1. 备用交换器绑定是在主交换器声明时绑定:channel.exchangeDeclare(exchangeName, exchangeType, false, false, argsMap);
  2. 死信交换器绑定是在队列声明时:channel.queueDeclare(queueName, false, true, false, argsMap);

同样的,关于arguments参数也不能随便指定,死信必须为x-dead-letter-exchange属性,也就是:

argsMap.put("x-dead-letter-exchange", 死信交换器名称);

死信消费者:

@Slf4j
public class DlxConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
           //省略RabbitMQ的配置、连接的配置、信道创建的配置
        
        //定义死信交换机
        channel.exchangeDeclare(NormalConsumer.DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //定义死信队列
        String queueName = "dlxQueue";
        channel.queueDeclare(queueName, false, false, false, null);
        //重新设置路由键(这里虽然是重置,但是依然和正常消费者的bindingKey一致)
        channel.queueBind(queueName, NormalConsumer.DLX_EXCHANGE_NAME, "#");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey(), message);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}

注意:如果你想要路由到死信交换机时重置路由键,那么你要新增arguments参数:

argsMap.put("x-dead-letter-routing-key", 死信路由的routingKey);

执行结果:

生产者发送消息:

生产者发送消息

普通消费者接收消息:

普通消费者接收消息

死信消费者接收消息:

死信消费者接收消息

3.12 流量控制(服务质量保证)

QOS即服务端限流,QOS对于拉模式的消费方式无效

使用QOS只要进行如下2个步骤即可:

  1. autoAck设置为false(autoAck=true的时候不生效)

  2. 调用basicConsume方法前先调用basicQos方法,这个方法有3个参数

    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
    
参数名 含义
prefetchSize 批量取的消息的总大小,0为不限制
prefetchCount 消费完prefetchCount条(prefetchCount条消息被ack)才再次推送
global global为true表示对channel进行限制,否则对每个消费者进行限制,因为一个channel允许有多个消费者

为什么要使用QOS?

  1. 提高服务稳定性。假设消费端有一段时间不可用,导致队列中有上万条未处理的消息,如果开启客户端,巨量的消息推送过来,可能会导致消费端变卡,也有可能直接不可用,所以服务端限流很重要。

  2. 提高吞吐量。当队列有多个消费者时,队列收到的消息以轮询的方式发送给消费者。但由于机器性能等的原因,每个消费者的消费能力不一样,这就会导致一些消费者处理完了消费的消息,而另一些则还堆积了一些消息,会造成整体应用吞吐量的下降。

生产者:

@Slf4j
public class QosProducer {

    public static final String EXCHANGE_NAME = "qos_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String routingKey = "error";

        for (int i = 0; i < 30; i++) {
            String message = "hello rabbitmq " + i;
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            log.info("send message, routingKey: {}, message: {}", routingKey ,message);
        }
        //省略关闭连接和信道的代码
    }
}

消费者:

/**
 * 使用qos的步骤
 * 1. autoAck设置为false
 * 2. 调用basicConsume方法前先调用basicQos方法
 */
@Slf4j
public class QosConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //省略RabbitMQ的配置、连接的配置、信道创建的配置
        channel.exchangeDeclare(QosProducer.EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String queueName = "qosQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        channel.queueBind(queueName, QosProducer.EXCHANGE_NAME, "error");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                channel.basicAck(envelope.getDeliveryTag(), false);
                log.info("get message, routingKey: {}, message: {}", envelope.getRoutingKey() ,message);
            }
        };

        // prefetchSize为批量取的消息的总大小,0为不限制
        // prefetchCount为消费完3条(3条消息被ack)才再次推送
        // global为true表示对channel进行限制,否则对每个消费者进行限制
        // 一个信道允许有多个消费者
        channel.basicQos(0, 3, false);
        channel.basicConsume(queueName, false, consumer);
    }
}

这里的channel.basicQos(0, 3, false);第三个参数为false,表示对每一个消费者进行消费完3条消息才再次进行消息推送的限制,如果为true,表示整个channel消费完3条消息才再次推送新消息;由于一个channel上可能会有多个消费者,所以global一般都设置为false,否则达不到流量控制的目的,消费能力差的消费者仍然回堆积很多未消费的消息。

4. SpringBoot整合RabbitMQ

Spring有三种配置方式

  1. 基于XML
  2. 基于JavaConfig
  3. 基于注解

当然现在已经很少使用XML来做配置了,只介绍一下用JavaConfig和注解的配置方式

RabbitMQ整合Spring Boot,我们只需要增加对应的starter即可

 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

4.1 基于注解

在application.yaml的配置如下

spring:
  rabbitmq:
    host: rabbitmq服务器IP
    port: 5672
    username: rabbitmq账号
    password: rabbitmq密码
    virtual-host: /

log:
  exchange: log.exchange
  info:
    queue: info.log.queue
    binding-key: info.log.key
  error:
    queue: error.log.queue
    binding-key: error.log.key
  all:
    queue: all.log.queue
    binding-key: '*.log.key'

消费者代码如下

@Slf4j
@Component
public class LogReceiverListener {

    /**
     * 接收info级别的日志
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.info.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
                    key = "${log.info.binding-key}"
            )
    )
    public void infoLog(Message message) {
        String msg = new String(message.getBody());
        log.info("infoLogQueue 收到的消息为: {}", msg);
    }

    /**
     * 接收所有的日志
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.all.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
                    key = "${log.all.binding-key}"
            )
    )
    public void allLog(Message message) {
        String msg = new String(message.getBody());
        log.info("allLogQueue 收到的消息为: {}", msg);
    }
}

生产者如下

@RunWith(SpringRunner.class)
@SpringBootTest
public class MsgProducerTest {

    @Autowired
    private AmqpTemplate amqpTemplate;
    @Value("${log.exchange}")
    private String exchange;
    @Value("${log.info.binding-key}")
    private String routingKey;

    @SneakyThrows
    @Test
    public void sendMsg() {
        for (int i = 0; i < 5; i++) {
            String message = "this is info message " + i;
            amqpTemplate.convertAndSend(exchange, routingKey, message);
        }

        System.in.read();
    }
}

在与SpringBoot整合之后,生产者发送消息也变了。

Spring Boot针对消息ACK的方式和原生API针对消息ACK的方式有点不同。

4.1.1 原生API消息ACK的方式

具体内容查看3.4小节-——消息确认ACK

4.1.2 SpringBoot中针对消息ACK的方式

有三种方式,定义在AcknowledgeMode枚举类中:

方式 解释
NONE 没有ACK,等价于原生API中的autoAck=true
MANUAL 用户需要手动发送ACK或者NACK
AUTO 方法正常结束,spring boot 框架返回ACK,发生异常spring boot框架返回NACK

SpringBoot针对消息默认的ACK的方式为AUTO。

在实际场景中,我们一般都是手动ACK。

application.yaml的配置改为如下

spring:
  rabbitmq:
    host: rabbitmq服务器ip
    port: 5672
    username: rabbitmq账号
    password: rabbitmq密码
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 手动ack,默认为auto

配置类:

  1. RabbitMqConfig:

    package com.javashitang.config;
    
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(
                @Value("${spring.rabbitmq.host}") String host,
                @Value("${spring.rabbitmq.port}") int port,
                @Value("${spring.rabbitmq.username}") String username,
                @Value("${spring.rabbitmq.password}") String password,
                @Value("${spring.rabbitmq.virtual-host}") String vhost) {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
            connectionFactory.setPort(port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(vhost);
            connectionFactory.setPublisherConfirms(true); //开启发布确认机制
            connectionFactory.setPublisherReturns(true); //开启发布返回机制(无法被路由到正确的队列时返回)
            return connectionFactory;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
                                             ReturnCallback returnCallback, ConfirmCallback confirmCallback) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            //开启生产者发送失败通知
            rabbitTemplate.setReturnCallback(returnCallback);
            //开启发布者确认
            rabbitTemplate.setConfirmCallback(confirmCallback);
            // 要想使 returnCallback 生效,必须设置为true
            rabbitTemplate.setMandatory(true);
            return rabbitTemplate;
        }
    }
    
  2. ReturnCallback:

    package com.javashitang.config;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ReturnCallback implements RabbitTemplate.ReturnCallback {
    
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            String msg = new String(message.getBody());
            System.out.println(String.format("消息 {%s} 不能被正确路由,routingKey为 {%s}", msg, routingKey));
        }
    }
    
  3. ConfirmCallback:

    package com.javashitang.config;
    
    import com.javashitang.consumer.MessageSender;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     *
     * 用来判断消息是否被ACK
     * ack true for ack, false for nack
     */
    @Component
    public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
    
        @Autowired
        private MessageSender messageSender;
    
        /**
         * 该方法在消息被 Broker 确认发送到交换机时被调用。
         * @param correlationData 关联原始消息的对象,其中包含了消息的唯一标识(这里使用 getId() 获取)。
         * @param ack 表示消息是否成功发送到交换机。如果为 true,表示成功;如果为 false,表示失败。
         * @param cause 发送失败的原因(如果有的话)
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String msgId = correlationData.getId();
            String msg = messageSender.dequeueUnAckMsg(msgId);
            if (ack) {
                System.out.println(String.format("消息 {%s} 成功发送给mq", msg));
            } else {
                System.out.println(String.format("消息 {%s} 发送mq失败,失败的原因 {%s}", msg,cause));
            }
        }
    }
    

    在 RabbitMQ Java 客户端库中,CorrelationData 类用于表示消息的关联数据。它通常用于在发送消息时,将一些额外的数据与消息关联起来,以便在消息发送确认时能够识别出对应的消息。

    具体来说,CorrelationData 类的作用如下:

    1. 消息关联标识: 在发送消息时,可以将一个 CorrelationData 实例与消息关联起来。这个 CorrelationData 实例通常包含一个唯一标识符或者其他与消息相关的数据。
    2. 消息发送确认: 当消息发送到 RabbitMQ 服务器后,可以通过 RabbitTemplate 或者 AmqpTemplate 的方法来指定一个 CorrelationData 实例。当消息被确认发送到交换机时,RabbitMQ 服务器会将相同的 CorrelationData 实例返回给客户端。这样客户端就可以通过比较返回的 CorrelationData 实例来确定哪条消息被成功发送到了交换机。
    3. 处理发送失败: 如果发送消息时发生错误(例如网络错误或者目标队列不存在),CorrelationData 实例也会在发送失败时返回给客户端。这样客户端可以通过 CorrelationData 实例来处理发送失败的情况,例如记录日志或者进行重试操作。

    在使用 RabbitMQ Java 客户端发送消息时,通常会携带一个 CorrelationData 实例,以便在消息发送确认或者发送失败时能够进行相应的处理。

  4. MessageSender:

    这个类并不是必须的其实,还得看你自己具体的实现。

    package com.javashitang.consumer;
    
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Component
    public class MessageSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //消息未确认的队列,key:消息唯一标识;value:消息内容
        public final Map<String, String> unAckMsgQueue = new ConcurrentHashMap<>();
    
        public void convertAndSend(String exchange, String routingKey, String message) {
            String msgId = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(msgId);
            //这才是真正发送消息的API
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            unAckMsgQueue.put(msgId, message);
        }
    
        public String dequeueUnAckMsg(String msgId) {
            return unAckMsgQueue.remove(msgId);
        }
    
    }
    

    相关API解释:

    public void convertAndSend(String exchange, String routingKey, final Object object,
                @Nullable CorrelationData correlationData) throws AmqpException
    

    参数一和二就不用说了,参数三是一个Object,一般用来传递消息体,参数四也就是一个CorrelationData,关联数据类。

相应的消费者代码改为

@Slf4j
@Component
public class LogListenerManual {

    /**
     * 接收info级别的日志
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${log.info.queue}", durable = "true"),
                    exchange = @Exchange(value = "${log.exchange}", type = ExchangeTypes.TOPIC),
                    key = "${log.info.binding-key}"
            )
    )
    public void infoLog(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        log.info("infoLogQueue 收到的消息为: {}", msg);
        try {
            // 这里写各种业务逻辑,这里是消息确认,不是发布者确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

我们上面用到的注解,作用如下

注解 作用
RabbitListener 消费消息,可以定义在类上,方法上,当定义在类上时需要和RabbitHandler配合使用
QueueBinding 定义绑定关系
Queue 定义队列
Exchange 定义交换机
RabbitHandler RabbitListener定义在类上时,需要用RabbitHandler指定处理的方法

4.2 基于JavaConfig

既然用注解这么方便,为啥还需要JavaConfig的方式呢?

JavaConfig方便自定义各种属性,比如同时配置多个virtual host等

在实际项目中,这种方式用的比较多一点。

yml配置:

spring:
  rabbitmq:
    host: rabbitmq服务器ip
    port: 5672
    username: rabbitmq账号
    password: rabbitmq密码
    virtual-host: /

基于注解的代码,其他代码不变,主要是RabbitMqConfig配置类的改变:

package com.javashitang.rabbitmq.config;

import com.javashitang.rabbitmq.consumer.LogReceiverListener;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

    public static final String LOG_EXCHANGE = "log.exchange";
    public static final String LOG_ALL_QUEUE = "all.log.exchange";
    public static final String LOG_ALL_BINDING_KEY = "*.log.key";

    // ====> declare connectionFactorys <===
    @Bean("msgConnectionFactory")
    public ConnectionFactory msgConnectionFactory(
            @Value("${spring.rabbitmq.host}") String host,
            @Value("${spring.rabbitmq.port}") int port,
            @Value("${spring.rabbitmq.username}") String username,
            @Value("${spring.rabbitmq.password}") String password,
            @Value("${spring.rabbitmq.virtual-host}") String vhost) {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

    // 声明队列
    @Bean(LOG_ALL_QUEUE)
    public Queue queueA1() {
        return new Queue(LOG_ALL_QUEUE, true);
    }

    // 声明交换器
    @Bean(LOG_EXCHANGE)
    public DirectExchange exchangeA1() {
        return new DirectExchange(LOG_EXCHANGE);
    }

    // 声明队列和交换器的绑定关系
    @Bean
    public Binding bindingA1(@Qualifier(LOG_ALL_QUEUE) Queue queue,
                             @Qualifier(LOG_EXCHANGE) DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(LOG_ALL_BINDING_KEY);
    }

    // 声明监听器容器
    @Bean
    public SimpleMessageListenerContainer container(
            @Qualifier("msgConnectionFactory") ConnectionFactory connectionFactory,
            @Qualifier(LOG_ALL_QUEUE) Queue q1,
            LogReceiverListener logReceiverListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //设置了要监听的队列,即注册了队列 q1 到容器中
        container.setQueues(q1); 
        //设置了最大并发消费者数和并发消费者数,即同时可以有多少个消费者监听消息队列。
        container.setMaxConcurrentConsumers(15);
        container.setConcurrentConsumers(15);
        // 手动确认
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //设置了消息监听器,即指定了消息监听器处理接收到的消息。
        container.setMessageListener(logReceiverListener);
        return container;
    }

    // 配置RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(@Qualifier("msgConnectionFactory") ConnectionFactory connectionFactory,
                                          ReturnCallback returnCallback, ConfirmCallback confirmCallback) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReturnCallback(returnCallback);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 要想使 returnCallback 生效,必须设置为true
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

新增消息监听器类:

package com.javashitang.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

/**
 *
 * 通过 RabbitMqConfig 设置,消息将被这个类接收
 */
@Component
public class LogReceiverListener implements ChannelAwareMessageListener {

    /**
     * 发送消息的生产者在测试包中 MsgProducerTest
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody());
        System.out.println("收到的消息为 " + msg);
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}

5. 如何处理消费过程中的重复消息

这一块属于拓展内容了,主要是讲解一个思想。

5.1 介绍

看到本节的题目你有可能会想怎么会遇到消费重复消息的场景呢?

生产者只发送一次不就行了,或者消息队列自动把重复的消息丢掉不就行了

当生成者成功发送消息到broker,但是没有得到响应时,会重新发送消息到broker,此时broker中就会有重复的消息。如果不重试的话就有可能造成消息丢失。

MQTT协议中阐明了消息传递的三种服务质量,这三种服务质量从低到高是:

  1. At most once:至多一次,消息在传递时,最多被送达一次
  2. At least once:至少一次,消息在传递时,至少被送达一次
  3. Exactly once:恰好一次,消息在传递时,恰好被送达一次

消息队列是可以把重复的消息丢掉,但实现成本很高,收益却很低。

我来举个例子,当消费者消费消息成功,但是ack失败,此时broker还是会发一条重复的消息到消费者,消费者还得保证方法的幂等。

所以消息队列允许少量的重复消息,在业务层面对方法做幂等是最佳的实现方式,我们常用的消息队列如RabbitMQ,RocketMQ,Kafka的服务质量都是At least once

5.2 如何保证方法的幂等呢?

使用唯一索引

对业务唯一的字段加上唯一索引,这样当数据重复时,插入数据库会抛异常

状态机幂等

如果业务上需要修改订单状态,例如订单状态有待支付,支付中,支付成功,支付失败。设计时最好只支持状态的单向改变。这样在更新的时候就可以加上条件,多次调用也只会执行一次。例如想把订单状态更新为支持成功,则之前的状态必须为支付中

update table_name set status = 支付成功 where status = 支付中

乐观锁实现幂等

  1. 查询数据获得版本号
  2. 通过版本号去更新,版本号匹配则更新,版本号不匹配则不更新
-- 假如查询出的version为1
select version from table_name where userid = 10;
-- 给用户的账户加10
update table_name set money = money -10, version = version + 1 where userid = 10 and version = 1

也可以通过条件来实现乐观锁,如库存不能超卖,数量不能小于0

update table_name set num = num - 10 where num - 10 >= 0

防重表

增加一个防重表,业务唯一的id作为唯一索引,如订单号,当想针对订单做一系列操作时,可以向防重表中插入一条记录,插入成功,执行后续操作,插入失败,则不执行后续操作。本质上可以看成是基于MySQL实现的分布式锁。根据业务场景决定执行成功后,是否删除防重表中对应的数据

分布式锁实现幂等

执行方法时,先根据业务唯一的id获取分布式锁,获取成功,则执行,失败则不执行。分布式锁可以基于redis,zookeeper,mysql来实现,分布式锁的细节就不介绍了

select+insert

先查询一下有没有符合要求的数据,如果没有再执行插入。没有并发的系统中可以保证幂等性,高并发下不要用这种方法,也会造成数据的重复插入。我一般做消息幂等的时候就是先select,有数据直接返回,没有数据加分布式锁进行insert操作

全局唯一号实现幂等

通过source(来源)+ seq(序列号)来判断请求是否重复,重复则直接返回请求重复提交,否则执行。如当多个三方系统调用服务的时候,就可以采用这种方式


文章作者: 念心卓
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 念心卓 !
  目录