Springboot整合消息队列
JMS
- JMS(Java Message Service):一个规范,等同于JDBC规范,提供了消息服务相关的API接口
- JMS消息模型
- peer-2-peer:点对点模型,消息发送到一个队列中,队列保存消息。队列的消息只能被一个消费者消费或超时
- publish-subscribe:发布订阅模型,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方的存在
- JMS消息种类
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
- ObjectMessage
- Message(只有消息头和属性)
- JMS实现:ActiveMQ、Redis、HornetMQ、RabbitMQ、RockeMQ(没有完全遵守JMS规范)
AMQP
AMQP(advanced message queuing protocol):一种协议(高级消息代理规范),规范了网络交换的数据格式,兼容JMS
点:具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现
消息模型
direct exchange
fanout exchange
topic exchange
headers exchange
system exchange
消息种类:byte[]
QP实现:RabbitMQ、StormMQ、RocketMQ
MQTT
- MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一
Kafka
- Kafka,一种高吞吐量的分布式订阅消息系统,提供实时消息功能。
ActiveMQ
下载地址
1
| https://activemq.apache.org/components/classic/download/
|
启动服务:
springboot整合ActiveMQ
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
|
- 在springboot核心配置文件配置activemq属性
1 2 3 4 5 6
| spring: activemq: broker-url: tcp://localhost:61616 jms: template: default-destination: msg//指定消息默认队列名字
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import com.example.service.MessageService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service;
@Service public class MessageServiceActivemqImpl implements MessageService {
@Autowired private JmsMessagingTemplate messagingTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列,id="+id); messagingTemplate.convertAndSend(id); }
@Override public String doMessage() { String id = messagingTemplate.receiveAndConvert(String.class); System.out.println("已完成短信发送业务,id:"+id); return id; } }
|
1 2
| messagingTemplate.convertAndSend("order.queue.id",id); String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);
|
- 使用监听器消费消息队列的消息并发送给另一个队列消息
1 2 3 4 5 6 7 8
| @JmsListener(destination = "order.queue.id") @SendTo("order.other.queue.id") public String receive(String id){ System.out.println("已完成短信发送业务,id:"+id); return "new:"+id; }
|
1 2 3 4 5 6 7
| spring: activemq: broker-url: tcp://localhost:61616 jms: pub-sub-domain: true template: default-destination: msg
|
RabbitMQ
RabbitMQ基于Erlang语言编写,需要安装Erlang
1
| https://www.erlang.org/downloads
|
1
| https://rabbitmq.com/install-windows.html
|
启动服务
在rabbitmq的安装目录下的sbin文件夹里,用管理员身份运行cmd窗口输入rabbitmq-service.bat start启动服务
访问服务器客户端的默认端口是15672(localhost:15672,用户名密码:guest/guest)
关闭服务:rabbitmq-service.bat stop
查看服务器状态:rabbitmqctl status
服务管理可视化(插件形式)
1 2 3 4 5 6
| rabbitmq-plugins.bat list
rabbitmq-plugins.bat enable rabbitmq_management
localhost:15672
|
springboot整合rabbitmq直连交换机模式
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
1 2 3 4 5 6 7 8 9 10
| spring: activemq: broker-url: tcp://localhost:61616 jms: pub-sub-domain: true template: default-destination: msg rabbitmq: host: localhost port: 5672
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Configuration public class RabbitDirectConfig {
@Bean public Queue directQueue(){
return new Queue("direct_queue",true,false,false); }
@Bean public Queue directQueue2(){ return new Queue("direct_queue2",true,false,false); }
@Bean public DirectExchange directExchange(){ return new DirectExchange("directExchange"); }
@Bean public Binding bindingDirect(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct"); }
@Bean public Binding bindingDirect2(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct2"); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Service public class MessageServiceRabbitmqDirectImpl implements MessageService {
@Autowired private AmqpTemplate amqpTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id="+id); amqpTemplate.convertAndSend("directExchange","direct",id); } }
|
- 使用消息监听器对消息队列监听(direct)(多个监听器监听的一个队列时轮循处理)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Component public class MessageListener {
@RabbitListener(queues = "direct_queue") public void receive(String id){ System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id); }
} ------------------------------------------------- @Component public class MessageListener2 {
@RabbitListener(queues = "direct_queue") public void receive(String id){ System.out.println("已完成短信发送业务(rabbitmq direct 2),id:"+id); }
}
|
springboot整合topic主题交换机模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitTopicConfig {
@Bean public Queue topicQueue(){
return new Queue("topic_queue",true,false,false); }
@Bean public Queue topicQueue2(){ return new Queue("topic_queue2",true,false,false); }
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); }
@Bean public Binding bindingTopic(){ return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.*"); }
@Bean public Binding bindingTopic2(){ return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.#"); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import com.example.service.MessageService; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service public class MessageServiceRabbitmqTopicImpl implements MessageService {
@Autowired private AmqpTemplate amqpTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id="+id); amqpTemplate.convertAndSend("topicExchange","topic.a.id",id); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class MessageListener {
@RabbitListener(queues = "topic_queue") public void receive(String id){ System.out.println("已完成短信发送业务(rabbitmq topic 1),id:"+id); }
@RabbitListener(queues = "topic_queue2") public void receive2(String id){ System.out.println("已完成短信发送业务(rabbitmq topic 222),id:"+id); }
}
|
RocketMQ
下载地址并启动服务
- 环境变量配置:ROCKETMQ_HOME、PATH、NAMESRV_ADDR(建议):127.0.0.1:9876
1
| https://rocketmq.apache.org/
|
1 2
| #安装路径下的bin文件夹 mqnamesrv.exe
|
1 2
| #安装路径下的bin文件夹 mqbroker.exe
|
1
| tools org.apache.rocketmq.example.quickstart.Producer
|
1
| tools org.apache.rocketmq.example.quickstart.Consumer
|
springboot整合rocketmq
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency>
|
1 2 3 4
| rocketmq: name-server: localhost:9876 producer: group: group_rocketmq
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import com.example.service.MessageService; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service public class MessageRocketmqImpl implements MessageService {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id="+id); rocketMQTemplate.convertAndSend("order_id",id); }
@Override public String doMessage() { return null; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import com.example.service.MessageService; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;
@Service public class MessageRocketmqImpl implements MessageService {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Override public void sendMessage(String id) { System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id="+id); SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功"); }
@Override public void onException(Throwable throwable) { System.out.println("消息发送失败!!!!"); } }; rocketMQTemplate.asyncSend("order_id",id,sendCallback); }
@Override public String doMessage() { return null; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Component @RocketMQMessageListener(consumerGroup = "group_rocketmq",topic = "order_id") public class MessageListener implements RocketMQListener<String> { @Override public void onMessage(String id) { System.out.println("已完成短信发送业务(rocketmq),id:"+id); } }
|
Kafka
下载地址
安装方式:解压缩
windows系统下3.0.0版本存在BUG,建议使用2.X版本
1
| https://kafka.apache.org/downloads
|
敬请期待————-
……………..
……………..