RabbitMQ快速入门
其他RabbitMQ文章
MQ 的基本概念
MQ概述
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进 行通信。
分布式正常通信:
使用MQ:
MQ,消息队列,存储消息的中间件
分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信
发送方称为生产者,接收方称为消费者
MQ 的优势和劣势
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何 保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理 失败。如何保证消息数据处理的一致性?
既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明 明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
- 容许短暂的不一致性。
- 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本
RabbitMQ 简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。
RabbitMQ 采用 Erlang 语言开发。
Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。
RabbitMQ 基础架构如下图:
RabbitMQ 中的相关概念:
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
- xxxxxxxxxx rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmqrm -rf /usr/local/erlangrm -rf /usr/local/rabbitmqshell
- Queue:消息最终被送到这里等待 consumer 取走
- Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据
RabbitMQ 提供了 6 种工作模式:
查看工作模式
JMS
- JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件 的API
- JMS 是 JavaEE 规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开 源社区有
总的来说
- RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
- RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
- AMQP 是协议,类比HTTP。
- JMS 是 API 规范接口,类比 JDBC。
RabbitMQ的安装和配置
简单模式入门案例
使用如下简单模式
在上图的模型中,有以下概念:
生产者,也就是要发送消息的程序
消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
记得启动RabbitMQ(本案例在虚拟机linux下启动的)~~
实现步骤
实现producer
在idea工具中新建maven项目,并导入以下依赖
1
2
3
4
5<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>编写代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class hello {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.84.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
/*
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后还存在
* 3.exclusive:
* -是否独占。只能有一个消费者监听这队列
* -当Connection关闭时,是否删除队列
* 4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
* 5.arguments:参数
* */
//如果没有一个名字叫hello_world的队列,则会自动创建该队列,如果有则不会创建
channel.queueDeclare("hello_world", true, false, false, null);
/*
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
* 参数:
* 1.exchange:交换机名称。简单模式下交换机会使用默认的 ""
* 2.routingKey:路由名称
* 3.props:配置信息
* 4.body:发送消息数据
* */
String body = "hello rabbitmq---";
//6.发送消息
channel.basicPublish("", "hello_world", null, body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
}运行后可在浏览器的rabbitmq管理界面看到所发送给mq的消息
实现consumer
在idea工具中新建maven项目,并导入以下依赖
1
2
3
4
5<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>编写代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class hello {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.84.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
/*
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后还存在
* 3.exclusive:
* -是否独占。只能有一个消费者监听这队列
* -当Connection关闭时,是否删除队列
* 4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
* 5.arguments:参数
* */
//如果没有一个名字叫hello_world的队列,则会自动创建该队列,如果有则不会创建
channel.queueDeclare("hello_world", true, false, false, null);
//接收消息
/*
* public String basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1.queue:队列名称
* 2.autoAck:是否自动确认
* 3.callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
*回调方法,当收到消息后会自动执行该方法
* 参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息、交换机、路由key。。。
* 3.properties:配置信息
* 4.body:数据
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:" + consumerTag);
System.out.println("envelope:" + envelope.getExchange());
System.out.println("RoutingKey:" + envelope.getRoutingKey());
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("hello_world", true, consumer);
}
}运行即可消费rabbitmq队列里的消息