其他RabbitMQ文章
消息的可靠投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提 供了两种方式用来控制消息的投递可靠性模式。
我们将利用这两个 callback 控制消息的可靠性投递
准备工作 新建maven项目(生产者的环境) 我这里就不写步骤了…
启动RabbitMQ服务 在虚拟机里运行以下命令
1 systemctl start rabbitmq-server
导入依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 <dependencies > <dependency > <groupId > org.springframework</groupId > <artifactId > spring-context</artifactId > <version > 5.3.15</version > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit</artifactId > <version > 2.3.10</version > </dependency > <dependency > <groupId > junit</groupId > <artifactId > junit</artifactId > <version > 4.12</version > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework</groupId > <artifactId > spring-test</artifactId > <version > 5.3.15</version > </dependency > </dependencies >
配置文件 在项目的src/main/resources目录下新建rabbitmq.properties文件
1 2 3 4 5 rabbitmq.host =xxx.xxx.xxx.xxx //你的虚拟机ip rabbitmq.port =5672 //端口号 rabbitmq.username =root //rabbitmq的用户名 rabbitmq.password =root //rabbitmq的密码 rabbitmq.virtual-host =/centos7 //虚拟机信息
在项目的src/main/resources目录下新建spring-rabbitmq-producer.xml文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <?xml version="1.0" encoding="UTF-8" ?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:context ="http://www.springframework.org/schema/context" xmlns:rabbit ="http://www.springframework.org/schema/rabbit" xsi:schemaLocation ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd" > <context:property-placeholder location ="classpath:rabbitmq.properties" /> <rabbit:connection-factory id ="connectionFactory" host ="${rabbitmq.host}" port ="${rabbitmq.port}" username ="${rabbitmq.username}" password ="${rabbitmq.password}" virtual-host ="${rabbitmq.virtual-host}" confirm-type ="CORRELATED" publisher-returns ="true" /> <rabbit:admin connection-factory ="connectionFactory" /> <rabbit:template id ="rabbitTemplate" connection-factory ="connectionFactory" /> <rabbit:queue id ="test_queue_confirm" name ="test_queue_confirm" > </rabbit:queue > <rabbit:direct-exchange name ="test_exchange_confirm" > <rabbit:bindings > <rabbit:binding queue ="test_queue_confirm" key ="confirm" > </rabbit:binding > </rabbit:bindings > </rabbit:direct-exchange > </beans >
消息的投递可靠性模式 在项目的测试环境中实现
src/test/与你项目统一的包名下新建测试类
通过手动错误的方式来实现效果(例如错误的交换机或者路由key)
测试类代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 package com.demo;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ProducerTest { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testConfirm () { rabbitTemplate.setConfirmCallback(new RabbitTemplate .ConfirmCallback() { @Override public void confirm (CorrelationData correlationData, boolean b, String s) { System.out.println("confirm方法被执行了————————" ); if (b) { System.out.println("接收消息成功:" + s); }else { System.out.println("接收消息失败:" + s); } } }); rabbitTemplate.convertAndSend("test_exchange_confirm" , "confirm" , "message confirm..." ); } @Test public void testReturn () { rabbitTemplate.setMandatory(true ); rabbitTemplate.setReturnsCallback(new RabbitTemplate .ReturnsCallback() { @Override public void returnedMessage (ReturnedMessage returnedMessage) { System.out.println("return 执行了=====" ); System.out.println(returnedMessage.getMessage()); System.out.println(returnedMessage.getExchange()); System.out.println(returnedMessage.getReplyCode()); System.out.println(returnedMessage.getReplyText()); System.out.println(returnedMessage.getRoutingKey()); } }); rabbitTemplate.convertAndSend("test_exchange_confirm111" , "confirm" , "message confirm..." ); } }
运行结果 confirm 确认模式 消息投递给不存在的虚拟机
rabbitTemplate.convertAndSend(“test_exchange_confirm111”, “confirm”, “message confirm…”);
1 2 3 4 5 6 7 8 运行结果为: SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html# StaticLoggerBinder for further details. confirm方法被执行了———————— 接收消息失败:channel error; protocol method: # method<channel.close>(reply-code=404, reply-text=NOT_ FOUND - no exchange 'test_ exchange_ confirm111' in vhost '/centos7', class-id=60, method-id=40) 进程已结束,退出代码0
return 退回模式 消息从交换机通过错误的路由key投递
rabbitTemplate.convertAndSend(“test_exchange_confirm”, “confirm111”, “message confirm…”);
1 2 3 4 5 6 7 8 9 10 11 12 运行结果为: SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html# StaticLoggerBinder for further details. return 执行了===== (Body:'message confirm...' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) test_ exchange_ confirm 312 NO_ ROUTE confirm111 进程已结束,退出代码0
消息的可靠投递小结 ➢ 设置ConnectionFactory的publisher-confirms=”true” /confirm-type=”CORRELATED”开启 确认模式。
➢ 使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回 调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发 送失败,需要处理。
➢ 设置ConnectionFactory的publisher-returns=”true” 开启 退回模式。
➢ 使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到 queue失败后,如果设置了rabbitTemplate.setMandatory(true)参数,则会将消息退 回给producer。并执行回调函数returnedMessage。
➢ 在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。 使用channel下列方法,完成事务控制: txSelect(), 用于将当前channel设置成transaction模式 txCommit(),用于提交事务 txRollback(),用于回滚事务
Consumer Ack 说明 ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的 消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如 果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则 调用channel.basicNack()方法,让其自动重新发送消息。
步骤 监听器 监听器里做以下更改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.stereotype.Component;import java.io.IOException;import java.util.List;@Component public class AckListener implements ChannelAwareBatchMessageListener { @Override public void onMessage (Message message, Channel channel) throws Exception { Thread.sleep(1000 ); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String (message.getBody())); System.out.println("处理业务逻辑" ); int a = 3 /0 ; channel.basicAck(deliveryTag, true ); } catch (Exception e) { channel.basicNack(deliveryTag, true , true ); } } @Override public void onMessageBatch (List<Message> list, Channel channel) { } }
配置文件 在rabbitmq消费者的配置文件中加入以下代码
acknowledge=”manual” 为设置消费者确认消息的方式为手动确认
1 2 3 4 <rabbit:listener-container connection-factory ="connectionFactory" acknowledge ="manual" > <rabbit:listener ref ="ackListener" queue-names ="test_queue_confirm" > </rabbit:listener > </rabbit:listener-container >
测试 测试代码(运行即可看到效果):
异常环境下因为没有确认消息,会一直重复发送消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.test.context.ContextConfiguration;import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml") public class ConsumerTest { @Test public void test () { while (true ) { } } }
小结 ➢ 在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手 动确认 。
➢ 如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息 。
➢ 如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
消费端限流 说明 在请求瞬间增多,比如每秒10000个请求,这样我们的服务器压力很大,这种的情况下我们可以采取限流策略
步骤 配置文件 在rabbitmq消费者的配置文件中加入以下代码
prefetch=”1”
1 2 3 4 <rabbit:listener-container connection-factory ="connectionFactory" acknowledge ="manual" prefetch ="1" > <rabbit:listener ref ="qosListener" queue-names ="test_queue_confirm" > </rabbit:listener > </rabbit:listener-container >
测试 新建监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.demo.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.stereotype.Component;import java.util.List;@Component public class QosListener implements ChannelAwareBatchMessageListener { @Override public void onMessage (Message message, Channel channel) throws Exception { Thread.sleep(2000 ); System.out.println(new String (message.getBody())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true ); } @Override public void onMessageBatch (List<Message> list, Channel channel) { } }
在消费端测试环境下运行即可看到效果
TTL 说明 ➢ TTL 全称 Time To Live(存活时间/过期时间)。
➢ 当消息到达存活时间后,还没有被消费,会被自动清除。
➢ RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
步骤 队列消息统一过期 在rabbitmq生产者的配置文件下添加以下代码
value=”10000” 单位为毫秒,10000为10秒
1 2 3 4 5 6 7 8 <rabbit:queue id ="test_queue_ttl" name ="test_queue_ttl" > <rabbit:queue-arguments > <entry key ="x-message-ttl" value ="10000" value-type ="java.lang.Integer" > </entry > </rabbit:queue-arguments > </rabbit:queue >
在测试环境下添加以下代码并运行
发送了十条消息,浏览器里面的rabbit图形化界面(http://{你的rabbitmq服务器的ip地址}:15672/#/queues)上可以查看,10秒后消息被移除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Test public void testTtl () { for (int i = 1 ; i <= 10 ; i++) { rabbitTemplate.convertAndSend("test_exchange_ttl" , "ttl.a" , "message ttl..." ); } }
消息单独过期 直接在代码里加
使用MessagePostProcessor对象给消息单独设置过期时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Test public void testTtl () { MessagePostProcessor messagePostProcessor = new MessagePostProcessor () { @Override public Message postProcessMessage (Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000" ); return message; } }; for (int i = 1 ; i <= 10 ; i++) { if (i == 2 ) { rabbitTemplate.convertAndSend("test_exchange_ttl" , "ttl.a" , "message ttl..." , messagePostProcessor); } else { rabbitTemplate.convertAndSend("test_exchange_ttl" , "ttl.a" , "message ttl..." ); } } }
小结 ➢ 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
➢ 设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部 时(消费时),会单独判断 这一消息是否过期。
➢ 如果两者都进行了设置,以时间短 的为准。
死信队列 说明
死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以 被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
(1)队列消息长度到达限制;
(2) 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
(3) 原队列存在消息过期设置,消息到达超时时间未被消费;
队列绑定死信交换机:
步骤 配置文件 在rabbitmq生产端的配置文件中加入以下代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 <rabbit:queue id ="test_queue_dlx" name ="test_queue_dlx" > <rabbit:queue-arguments > <entry key ="x-dead-letter-exchange" value ="exchange_dlx" /> <entry key ="x-dead-letter-routing-key" value ="dlx.hehe" /> <entry key ="x-message-ttl" value ="10000" value-type ="java.lang.Integer" > </entry > <entry key ="x-max-length" value ="10" value-type ="java.lang.Integer" > </entry > </rabbit:queue-arguments > </rabbit:queue > <rabbit:topic-exchange name ="test_exchange_dlx" > <rabbit:bindings > <rabbit:binding pattern ="test_dlx.#" queue ="test_queue_dlx" > </rabbit:binding > </rabbit:bindings > </rabbit:topic-exchange > <rabbit:queue id ="queue_dlx" name ="queue_dlx" > </rabbit:queue > <rabbit:topic-exchange name ="exchange_dlx" > <rabbit:bindings > <rabbit:binding pattern ="dlx.#" queue ="queue_dlx" > </rabbit:binding > </rabbit:bindings > </rabbit:topic-exchange >
测试 在生产者和消费者测试环境中实现消息成为死信的三种情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Test public void testDlx () { rabbitTemplate.convertAndSend("test_exchange_dlx" , "test_dlx.haha" , "我是死信消息" ); }
第三中情况需要在消费端更改监听器代码,让拒收的消息不再重回队列
basicNack方法的第三参数传入一个false,channel.basicNack(deliveryTag, true, false);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.stereotype.Component;import java.util.List;@Component public class DlxListener implements ChannelAwareBatchMessageListener { @Override public void onMessage (Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String (message.getBody())); System.out.println("处理业务逻辑" ); int a = 3 /0 ; channel.basicAck(deliveryTag, true ); } catch (Exception e) { channel.basicNack(deliveryTag, true , false ); } } @Override public void onMessageBatch (List<Message> list, Channel channel) { } }
这样正常队列的消息就会在变成死信的时候发送到死信交换机里,再由死信交换叫发送到死信队列
小结
死信交换机和死信队列和普通的没有区别。
当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队 列 。
消息成为死信的三种情况:
(1)队列消息长度到达限制;
(2) 消费者拒接消费消息,并且不重回队列;
(3) 原队列存在消息过期设置,消息到达超时时间未被消费;
延迟队列 说明 延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
应用场景:
下单后,30分钟未支付,取消订单,回滚库存。
新用户注册成功7天后,发送短信问候。
实现方式:
定时器
延迟队列
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL + 死信队列 组合实现延迟队列的效果。
步骤 配置文件 在rabbitmq生产端加入以下代码
延迟队列和死信队列相似,是以死信队列加上设置队列的过期时间完成的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <rabbit:queue id ="order_queue" name ="order_queue" > <rabbit:queue-arguments > <entry key ="x-dead-letter-exchange" value ="order_exchange_dlx" > </entry > <entry key ="x-dead-letter-routing-key" value ="dlx.order.cancel" > </entry > <entry key ="x-message-ttl" value ="10000" value-type ="java.lang.Integer" > </entry > </rabbit:queue-arguments > </rabbit:queue > <rabbit:topic-exchange name ="order_exchange" > <rabbit:bindings > <rabbit:binding pattern ="order.#" queue ="order_queue" > </rabbit:binding > </rabbit:bindings > </rabbit:topic-exchange > <rabbit:queue id ="order_queue_dlx" name ="order_queue_dlx" > </rabbit:queue > <rabbit:topic-exchange name ="order_exchange_dlx" > <rabbit:bindings > <rabbit:binding pattern ="dlx.order.#" queue ="order_queue_dlx" > </rabbit:binding > </rabbit:bindings > </rabbit:topic-exchange >
测试 在生产端的测试环境中加入以下代码
1 2 3 4 5 6 7 8 9 10 11 12 @Test public void testOrder () throws InterruptedException { rabbitTemplate.convertAndSend("order_exchange" , "order.a" , "订单消息:id=1;time=18点48分" ); for (int i = 10 ; i > 0 ; i--) { System.out.println(i + "..." ); Thread.sleep(1000 ); } }
消费端可以在监听器里添加相对于的业务逻辑
比如应用场景里的 下单后,30分钟未支付,取消订单,回滚库存。这里把30分钟改为了10秒
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package com.demo.listener;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;import org.springframework.stereotype.Component;import java.util.List;@Component public class OrderListener implements ChannelAwareBatchMessageListener { @Override public void onMessage (Message message, Channel channel) throws Exception { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(new String (message.getBody())); System.out.println("处理业务逻辑---" ); System.out.println("根据订单id查询状态---" ); System.out.println("判断是否支付---" ); System.out.println("未支付,取消订单、回滚库存---" ); channel.basicAck(deliveryTag, true ); } catch (Exception e) { channel.basicNack(deliveryTag, true , false ); } } @Override public void onMessageBatch (List<Message> list, Channel channel) { } }
记得在消费端的配置文件里把监听器添加
1 2 3 4 <rabbit:listener-container connection-factory ="connectionFactory" acknowledge ="manual" prefetch ="1" > <rabbit:listener ref ="orderListener" queue-names ="order_queue_dlx" > </rabbit:listener > </rabbit:listener-container >
运行消费和生产端即可看见效果
小结
延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
RabbitMQ没有提供延迟队列功能,但是可以使用 : TTL + DLX 来实现延迟队列效果。