其他RabbitMQ文章

消息的可靠投递

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提 供了两种方式用来控制消息的投递可靠性模式。

  • confirm 确认模式
  • return 退回模式

    rabbitmq 整个消息投递的路径为: producer—->rabbitmq broker—->exchange—->queue—->consumer

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 。

  • 消息从 exchange—>queue 投递失败则会返回一个 returnCallback 。

我们将利用这两个 callback 控制消息的可靠性投递

准备工作

新建maven项目(生产者的环境)

我这里就不写步骤了…

启动RabbitMQ服务

在虚拟机里运行以下命令

1
systemctl start rabbitmq-server

导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.3.15</version>
</dependency>

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.3.10</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.3.15</version>
</dependency>
</dependencies>

配置文件

在项目的src/main/resources目录下新建rabbitmq.properties文件

1
2
3
4
5
rabbitmq.host=xxx.xxx.xxx.xxx	//你的虚拟机ip
rabbitmq.port=5672 //端口号
rabbitmq.username=root //rabbitmq的用户名
rabbitmq.password=root //rabbitmq的密码
rabbitmq.virtual-host=/centos7 //虚拟机信息

在项目的src/main/resources目录下新建spring-rabbitmq-producer.xml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加载配置文件-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>

<!-- 定义rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"
confirm-type="CORRELATED"
publisher-returns="true"
/>
<!--定义管理交换机、队列-->
<rabbit:admin connection-factory="connectionFactory"/>

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

<!-- 消息可靠性投递 -->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>

<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>

消息的投递可靠性模式

在项目的测试环境中实现

src/test/与你项目统一的包名下新建测试类

通过手动错误的方式来实现效果(例如错误的交换机或者路由key)

测试类代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package com.demo;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {

@Autowired
private RabbitTemplate rabbitTemplate;

/*
* 确认模式:
* 步骤:
* 1、确认模式开启:ConnectionFactory中开启publisher-confirms="true"(旧版)confirm-type="CORRELATED"(新版)
* 2、在rabbitTemplate定义ConfirmCallBack回调函数
* */
@Test
public void testConfirm() {

//2、定义回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相关配置信息
* @param b exchange交换机是否收到了消息。true成功,false失败
* @param s 失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被执行了————————");
if (b) {
System.out.println("接收消息成功:" + s);
}else {
System.out.println("接收消息失败:" + s);
}
}
});

rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm...");

}

/*
* 回退模式:当消息发送给Exchange后,Exchange路由到Queue失败时才会执行ReturnCallBack
* 步骤:
* 1、开启回退模式:publisher-returns="true"
* 2、设置ReturnCallBack
* 3、设置Exchange处理消息的模式:
* 1.如果消息没有路由到Queue,则丢弃消息(默认)
* 2.如果消息没有路由到Queue,返回消息发送方ReturnCallBack
* */
@Test
public void testReturn() {
//1、设置交换机处理失败消息的模式
rabbitTemplate.setMandatory(true);
//2、设置ReturnCallBack
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("return 执行了=====");
System.out.println(returnedMessage.getMessage());
System.out.println(returnedMessage.getExchange());
System.out.println(returnedMessage.getReplyCode());
System.out.println(returnedMessage.getReplyText());
System.out.println(returnedMessage.getRoutingKey());
}
});
//3、发送消息
rabbitTemplate.convertAndSend("test_exchange_confirm111", "confirm", "message confirm...");
}

}

运行结果

confirm 确认模式

消息投递给不存在的虚拟机

rabbitTemplate.convertAndSend(“test_exchange_confirm111”, “confirm”, “message confirm…”);

1
2
3
4
5
6
7
8
运行结果为:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
confirm方法被执行了————————
接收消息失败:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test_exchange_confirm111' in vhost '/centos7', class-id=60, method-id=40)

进程已结束,退出代码0

return 退回模式

消息从交换机通过错误的路由key投递

rabbitTemplate.convertAndSend(“test_exchange_confirm”, “confirm111”, “message confirm…”);

1
2
3
4
5
6
7
8
9
10
11
12
运行结果为:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
return 执行了=====
(Body:'message confirm...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
test_exchange_confirm
312
NO_ROUTE
confirm111

进程已结束,退出代码0

消息的可靠投递小结

➢ 设置ConnectionFactory的publisher-confirms=”true” /confirm-type=”CORRELATED”开启 确认模式。

➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回 调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发 送失败,需要处理。

➢ 设置ConnectionFactory的publisher-returns=”true” 开启 退回模式。

➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退 回给producer。并执行回调函数returnedMessage。

➢ 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。 使用channel下列方法,完成事务控制: txSelect(), 用于将当前channel设置成transaction模式 txCommit(),用于提交事务 txRollback(),用于回滚事务

Consumer Ack

说明

ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。

有三种确认方式:

  • 自动确认:acknowledge=”none”

  • 手动确认:acknowledge=”manual”

  • 根据异常情况确认:acknowledge=”auto”

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的 消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如 果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则 调用channel.basicNack()方法,让其自动重新发送消息。

步骤

监听器

监听器里做以下更改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;

/*
* Consumer ACK机制:
* 1、设置手动签署: acknowledge="manual"
* 2、让监听器类实现ChannelAwareMessageListener接口
* 3、如果消息成功处理,则调用channel的basicAck()签收
* 4、如果消息处理失败,则调用channel的basicNack()拒绝签署,broker重新发送给consumer
*
* */
@Component
public class AckListener implements ChannelAwareBatchMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1、接收转换消息
System.out.println(new String(message.getBody()));
//2、处理业务逻辑
System.out.println("处理业务逻辑");
int a = 3/0;//模拟异常环境
//3、手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();

//4、拒绝签收
/*
* 第三个参数:b1: 重回队列。如果设置为true,则消息重新回到queue,broker会重新发送消息给消费端
*/
channel.basicNack(deliveryTag, true, true);
//channel.basicReject(deliveryTag, true);
}
}

@Override
public void onMessageBatch(List<Message> list, Channel channel) {

}
}

配置文件

在rabbitmq消费者的配置文件中加入以下代码

acknowledge=”manual” 为设置消费者确认消息的方式为手动确认

1
2
3
4
<!--  定义监听器bean  -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual">
<rabbit:listener ref="ackListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>

测试

测试代码(运行即可看到效果):

异常环境下因为没有确认消息,会一直重复发送消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ConsumerTest {

@Test
public void test() {
while (true) {

}
}

}

小结

➢ 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手 动确认 。

➢ 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息 。

➢ 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。

消费端限流

说明

在请求瞬间增多,比如每秒10000个请求,这样我们的服务器压力很大,这种的情况下我们可以采取限流策略

步骤

配置文件

在rabbitmq消费者的配置文件中加入以下代码

prefetch=”1”

1
2
3
4
<!--  定义监听器bean  -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
</rabbit:listener-container>

测试

新建监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.demo.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.stereotype.Component;

import java.util.List;

/*
* Consumer 限流机制
* 1、确保ack机制为手动确认
* 2、在listener-container配置属性
* perfetch = 1,表示消费端每次从mq拉取一条消息消费,知道手动确认消费完毕后,才会继续拉取下一条消息
* */
@Component
public class QosListener implements ChannelAwareBatchMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
Thread.sleep(2000);//为了更直观的看到效果
System.out.println(new String(message.getBody()));

channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}

@Override
public void onMessageBatch(List<Message> list, Channel channel) {

}
}

在消费端测试环境下运行即可看到效果

TTL

说明

➢ TTL 全称 Time To Live(存活时间/过期时间)。

➢ 当消息到达存活时间后,还没有被消费,会被自动清除。

➢ RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

步骤

队列消息统一过期

在rabbitmq生产者的配置文件下添加以下代码

value=”10000” 单位为毫秒,10000为10秒

1
2
3
4
5
6
7
8
<!-- ttl -->
<rabbit:queue id="test_queue_ttl" name="test_queue_ttl" >
<!-- 设置queue的参数 -->
<rabbit:queue-arguments>
<!-- x-message-ttl指队列的过期时间 -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>

在测试环境下添加以下代码并运行

发送了十条消息,浏览器里面的rabbit图形化界面(http://{你的rabbitmq服务器的ip地址}:15672/#/queues)上可以查看,10秒后消息被移除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* ttl: 过期时间
* 1、队列统一过期
* 2、消息单独过期
*
* 如果设置了消息的过期时间,也设置了队列的过期时间,他以时间短的为准
* 队列过期后,会将队列所有消息全部移除
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除)
*/
@Test
public void testTtl() {

for (int i = 1; i <= 10; i++) {
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.a", "message ttl...");
}

}

消息单独过期

直接在代码里加

使用MessagePostProcessor对象给消息单独设置过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* ttl: 过期时间
* 1、队列统一过期
* 2、消息单独过期
*
* 如果设置了消息的过期时间,也设置了队列的过期时间,他以时间短的为准
* 队列过期后,会将队列所有消息全部移除
* 消息过期后,只有消息在队列顶端,才会判断其是否过期(移除)
*/
@Test
public void testTtl() {
//2、设置消息单独过期时间
//消息后处理对象,设置一些消息的参数
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//1、设置message的参数
message.getMessageProperties().setExpiration("5000");
//2、返回该消息
return message;
}
};
for (int i = 1; i <= 10; i++) {
if (i == 2) {
//此消息在队列第二位置,不会被移除
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.a", "message ttl...", messagePostProcessor);
} else {
rabbitTemplate.convertAndSend("test_exchange_ttl", "ttl.a", "message ttl...");
}
}
}

小结

➢ 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

➢ 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断 这一消息是否过期。

➢ 如果两者都进行了设置,以时间短的为准。

死信队列

说明

  1. 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以 被重新发送到另一个交换机,这个交换机就是DLX。

  2. 消息成为死信的三种情况:

    (1)队列消息长度到达限制;

    (2) 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

    (3) 原队列存在消息过期设置,消息到达超时时间未被消费;

  3. 队列绑定死信交换机:

步骤

配置文件

在rabbitmq生产端的配置文件中加入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<!--  正常队列  -->
<rabbit:queue id="test_queue_dlx" name="test_queue_dlx">
<!-- 正常队列绑定死信交换机 -->
<rabbit:queue-arguments>
<!-- 绑定x-dead-letter-exchange:死信交换机名称 -->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!-- 绑定x-dead-letter-routing-key:发送给死信交换机的routingkey -->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!-- 设置队列的过期时间 -->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"></entry>
<!-- 设置队列的消息数量限制 -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 正常交换机 -->
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test_dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

<!-- 死信队列 -->
<rabbit:queue id="queue_dlx" name="queue_dlx">
</rabbit:queue>
<!-- 死信交换机 -->
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

测试

在生产者和消费者测试环境中实现消息成为死信的三种情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    /**
* 死信队列测试
*/
@Test
public void testDlx() {
//1、发送过期消息给死信队列
// rabbitTemplate.convertAndSend("test_exchange_dlx", "test_dlx.haha", "我是死信消息");

//2、发送超出队列消息数量
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("test_exchange_dlx", "test_dlx.haha", "我是死信消息");
// }

//3、消费端拒绝接收的消息,见消费端代码
rabbitTemplate.convertAndSend("test_exchange_dlx", "test_dlx.haha", "我是死信消息");
}

第三中情况需要在消费端更改监听器代码,让拒收的消息不再重回队列

basicNack方法的第三参数传入一个false,channel.basicNack(deliveryTag, true, false);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class DlxListener implements ChannelAwareBatchMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1、接收转换消息
System.out.println(new String(message.getBody()));
//2、处理业务逻辑
System.out.println("处理业务逻辑");
int a = 3/0;
//3、手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();

//4、拒绝签收
/*
* 第三个参数:b1: 重回队列。如果设置为true,则消息重新回到queue,broker会重新发送消息给消费端
*/
channel.basicNack(deliveryTag, true, false);
// channel.basicReject(deliveryTag, true);
}
}

@Override
public void onMessageBatch(List<Message> list, Channel channel) {

}
}
这样正常队列的消息就会在变成死信的时候发送到死信交换机里,再由死信交换叫发送到死信队列

小结

  1. 死信交换机和死信队列和普通的没有区别。

  2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队 列 。

  3. 消息成为死信的三种情况:

    (1)队列消息长度到达限制;

    (2) 消费者拒接消费消息,并且不重回队列;

    (3) 原队列存在消息过期设置,消息到达超时时间未被消费;

延迟队列

说明

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

应用场景:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器
  2. 延迟队列

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL + 死信队列 组合实现延迟队列的效果。

步骤

配置文件

在rabbitmq生产端加入以下代码

延迟队列和死信队列相似,是以死信队列加上设置队列的过期时间完成的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<!--  延迟队列  -->
<rabbit:queue id="order_queue" name="order_queue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" ></entry>
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" ></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" ></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>

测试

在生产端的测试环境中加入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 延迟队列测试
*/
@Test
public void testOrder() throws InterruptedException {
rabbitTemplate.convertAndSend("order_exchange", "order.a", "订单消息:id=1;time=18点48分");
//为了更直观的观察10秒后消息发送到延迟队列
for (int i = 10; i > 0; i--) {
System.out.println(i + "...");
Thread.sleep(1000);
}
}

消费端可以在监听器里添加相对于的业务逻辑

比如应用场景里的 下单后,30分钟未支付,取消订单,回滚库存。这里把30分钟改为了10秒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.demo.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.stereotype.Component;

import java.util.List;

/*
* Consumer ACK机制:
* 1、设置手动签署: acknowledge="manual"
* 2、让监听器类实现ChannelAwareMessageListener接口
* 3、如果消息成功处理,则调用channel的basicAck()签收
* 4、如果消息处理失败,则调用channel的basicNack()拒绝签署,broker重新发送给consumer
*
* */
@Component
public class OrderListener implements ChannelAwareBatchMessageListener {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {
//1、接收转换消息
System.out.println(new String(message.getBody()));
//2、处理业务逻辑
System.out.println("处理业务逻辑---");
System.out.println("根据订单id查询状态---");
System.out.println("判断是否支付---");
System.out.println("未支付,取消订单、回滚库存---");
//3、手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();

//4、拒绝签收
/*
* 第三个参数:b1: 重回队列。如果设置为true,则消息重新回到queue,broker会重新发送消息给消费端
*/
channel.basicNack(deliveryTag, true, false);
// channel.basicReject(deliveryTag, true);
}
}

@Override
public void onMessageBatch(List<Message> list, Channel channel) {

}
}

记得在消费端的配置文件里把监听器添加

1
2
3
4
<!--  定义监听器bean  -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
</rabbit:listener-container>

运行消费和生产端即可看见效果

小结

  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
  2. RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。