Springboot整合消息队列

  • 企业级应用中广泛使用的三种异步消息传递技术

JMS

  1. JMS(Java Message Service):一个规范,等同于JDBC规范,提供了消息服务相关的API接口
  2. JMS消息模型
    • peer-2-peer:点对点模型,消息发送到一个队列中,队列保存消息。队列的消息只能被一个消费者消费或超时
    • publish-subscribe:发布订阅模型,消息可以被多个消费者消费,生产者和消费者完全独立,不需要感知对方的存在
  3. JMS消息种类
    • TextMessage
    • MapMessage
    • BytesMessage
    • StreamMessage
    • ObjectMessage
    • Message(只有消息头和属性)
  4. JMS实现:ActiveMQ、Redis、HornetMQ、RabbitMQ、RockeMQ(没有完全遵守JMS规范)

AMQP

  1. AMQP(advanced message queuing protocol):一种协议(高级消息代理规范),规范了网络交换的数据格式,兼容JMS

  2. 点:具有跨平台性,服务器供应商,生产者,消费者可以使用不同的语言来实现

  3. 消息模型

    • direct exchange

    • fanout exchange

    • topic exchange

    • headers exchange

    • system exchange

  1. 消息种类:byte[]

  2. QP实现:RabbitMQ、StormMQ、RocketMQ

MQTT

  • MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为小设备设计,是物联网(IOT)生态系统中主要成分之一

Kafka

  • Kafka,一种高吞吐量的分布式订阅消息系统,提供实时消息功能。

ActiveMQ

下载地址

  • 安装方式:解压缩
1
https://activemq.apache.org/components/classic/download/

启动服务:

  • 在压缩好的文件夹找到bin文件夹下的你对应的操作系统位数的文件夹下的activemq.bat文件,双击运行

  • 服务端口61616,管理后台端口8161,用户名&密码:admin

1
http://127.0.0.1:8161/

springboot整合ActiveMQ

  • 在pom.xml导入activemq依赖
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  • 在springboot核心配置文件配置activemq属性
1
2
3
4
5
6
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
template:
default-destination: msg//指定消息默认队列名字
  • 使用JmsMessagingTemplate对像
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.example.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceActivemqImpl implements MessageService {

@Autowired
private JmsMessagingTemplate messagingTemplate;

@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列,id="+id);
messagingTemplate.convertAndSend(id);
}

@Override
public String doMessage() {
String id = messagingTemplate.receiveAndConvert(String.class);
System.out.println("已完成短信发送业务,id:"+id);
return id;
}
}
  • 生产和消费时指定列队名字
1
2
messagingTemplate.convertAndSend("order.queue.id",id);
String id = messagingTemplate.receiveAndConvert("order.queue.id",String.class);
  • 使用监听器消费消息队列的消息并发送给另一个队列消息
1
2
3
4
5
6
7
8
//指定消息队列的名字
@JmsListener(destination = "order.queue.id")
//发送给指定名字的消息队列,发送的值由方法返回值提供
@SendTo("order.other.queue.id")
public String receive(String id){
System.out.println("已完成短信发送业务,id:"+id);
return "new:"+id;
}
  • 更换发布订阅模式(默认点对点模式)
1
2
3
4
5
6
7
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
pub-sub-domain: true #更改为发布订阅模式
template:
default-destination: msg

RabbitMQ

RabbitMQ基于Erlang语言编写,需要安装Erlang

  • Erlang下载地址:
1
https://www.erlang.org/downloads
  • 安装方式:一键傻瓜式安装,安装完毕需要重启,需要依赖Windows组件

  • 配置环境变量:ERLANG_HOME、PATHRabbitMQ下载地址:

  • 安装方式:一键傻瓜式安装

  • RabbitMQ下载地址:
1
https://rabbitmq.com/install-windows.html

启动服务

  • 在rabbitmq的安装目录下的sbin文件夹里,用管理员身份运行cmd窗口输入rabbitmq-service.bat start启动服务

  • 访问服务器客户端的默认端口是15672(localhost:15672,用户名密码:guest/guest)

  • 关闭服务:rabbitmq-service.bat stop

  • 查看服务器状态:rabbitmqctl status

  • 服务管理可视化(插件形式)

1
2
3
4
5
6
#查看已安装的插件列表
rabbitmq-plugins.bat list
#开启服务管理插件
rabbitmq-plugins.bat enable rabbitmq_management
#访问服务器 服务端口5672 管理后台段克15672(用户名&密码:guest)
localhost:15672

springboot整合rabbitmq直连交换机模式

  • 添加依赖
1
2
3
4
5
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 在yml里添加rabbitmq配置
1
2
3
4
5
6
7
8
9
10
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
pub-sub-domain: true #更改为发布订阅模式
template:
default-destination: msg
rabbitmq: #rabbitmq最基本配置
host: localhost
port: 5672
  • 创建消息队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class RabbitDirectConfig {

//队列
@Bean
public Queue directQueue(){
/*
* durable:是否持久化,默认false
* exclusive:是否当前连接专用,默认false,连接关闭后队列即被删除
* autoDelete:是否自动删除,当前生产者或者消费者不再使用此队列,自动删除
* */
return new Queue("direct_queue",true,false,false);
}

@Bean
public Queue directQueue2(){
return new Queue("direct_queue2",true,false,false);
}

//交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}

//绑定
@Bean
public Binding bindingDirect(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
}

@Bean
public Binding bindingDirect2(){
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct2");
}

}
  • 生产与消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class MessageServiceRabbitmqDirectImpl implements MessageService {

@Autowired
private AmqpTemplate amqpTemplate;

@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rabbitmq direct),id="+id);
amqpTemplate.convertAndSend("directExchange","direct",id);
}
}

  • 使用消息监听器对消息队列监听(direct)(多个监听器监听的一个队列时轮循处理)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class MessageListener {

@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("已完成短信发送业务(rabbitmq direct),id:"+id);
}

}
-------------------------------------------------
@Component
public class MessageListener2 {

@RabbitListener(queues = "direct_queue")
public void receive(String id){
System.out.println("已完成短信发送业务(rabbitmq direct 2),id:"+id);
}

}

springboot整合topic主题交换机模式

  • 定义消息队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicConfig {

//队列
@Bean
public Queue topicQueue(){
/*
* durable:是否持久化,默认false
* exclusive:是否当前连接专用,默认false,连接关闭后队列即被删除
* autoDelete:是否自动删除,当前生产者或者消费者不再使用此队列,自动删除
* */
return new Queue("topic_queue",true,false,false);
}

@Bean
public Queue topicQueue2(){
return new Queue("topic_queue2",true,false,false);
}

//交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}

/**
*绑定键匹配规则
* *(星号):用来表示一个单词,且该单词是必须出现的
* #(井号):用来表示任意数量
*/

//绑定
@Bean
public Binding bindingTopic(){
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.*.*");
}

@Bean
public Binding bindingTopic2(){
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.#");
}

}
  • 生产与消费消息(topic)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.example.service.MessageService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageServiceRabbitmqTopicImpl implements MessageService {

@Autowired
private AmqpTemplate amqpTemplate;

@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rabbitmq topic),id="+id);
amqpTemplate.convertAndSend("topicExchange","topic.a.id",id);
}
}
  • 使用消息监听器对消息队列监听(topic)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {

@RabbitListener(queues = "topic_queue")
public void receive(String id){
System.out.println("已完成短信发送业务(rabbitmq topic 1),id:"+id);
}

@RabbitListener(queues = "topic_queue2")
public void receive2(String id){
System.out.println("已完成短信发送业务(rabbitmq topic 222),id:"+id);
}

}

RocketMQ

下载地址并启动服务

  • 安装方式:解压缩
  • 默认服务器端口:9876
  • 环境变量配置:ROCKETMQ_HOME、PATH、NAMESRV_ADDR(建议):127.0.0.1:9876
1
https://rocketmq.apache.org/
  • 启动命名服务器
1
2
#安装路径下的bin文件夹
mqnamesrv.exe
  • 启动broker
1
2
#安装路径下的bin文件夹
mqbroker.exe
  • 服务器功能测试:生产者
1
tools org.apache.rocketmq.example.quickstart.Producer
  • 服务器功能测试:消费者
1
tools org.apache.rocketmq.example.quickstart.Consumer

springboot整合rocketmq

  • 导入RocketMQ依赖
1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
  • 配置RocketMQ(基础配置)
1
2
3
4
rocketmq:
name-server: localhost:9876
producer:
group: group_rocketmq
  • 生产消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.example.service.MessageService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageRocketmqImpl implements MessageService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id="+id);
//同步消息
rocketMQTemplate.convertAndSend("order_id",id);
}

@Override
public String doMessage() {
return null;
}
}

  • 生产异步消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.example.service.MessageService;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessageRocketmqImpl implements MessageService {

@Autowired
private RocketMQTemplate rocketMQTemplate;


@Override
public void sendMessage(String id) {
System.out.println("待发送短信的订单已纳入处理队列(rocketmq),id="+id);

//同步消息
//rocketMQTemplate.convertAndSend("order_id",id);

//异步消息回调操作
SendCallback sendCallback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息发送成功");
}

@Override
public void onException(Throwable throwable) {
System.out.println("消息发送失败!!!!");
}
};
rocketMQTemplate.asyncSend("order_id",id,sendCallback);
}

@Override
public String doMessage() {
return null;
}
}

  • 使用消息监听器对消息队列监听
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(consumerGroup = "group_rocketmq",topic = "order_id")
public class MessageListener implements RocketMQListener<String> {

@Override
public void onMessage(String id) {
System.out.println("已完成短信发送业务(rocketmq),id:"+id);
}

}


Kafka

下载地址

安装方式:解压缩

windows系统下3.0.0版本存在BUG,建议使用2.X版本

1
https://kafka.apache.org/downloads

敬请期待————-

……………..

……………..