RabbitMQ的工作模式
其他RabbitMQ文章
RabbitMQ工作模式
快速定位(点击即可跳转)
简单模式
Work Queues
模式说明
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
代码编写
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多 个消费者同时对消费消息的测试。
生产者(producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58package com.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.84.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建队列Queue
/*
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后还存在
* 3.exclusive:
* -是否独占。只能有一个消费者监听这队列
* -当Connection关闭时,是否删除队列
* 4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
* 5.arguments:参数
* */
//如果没有一个名字叫hello_world的队列,则会自动创建该队列,如果有则不会创建
channel.queueDeclare("work_queues", true, false, false, null);
/*
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
* 参数:
* 1.exchange:交换机名称。简单模式下交换机会使用默认的 ""
* 2.routingKey:路由名称
* 3.props:配置信息
* 4.body:发送消息数据
* */
for (int i = 0; i < 10; i++) {
String body =i + "hello rabbitmq---";
//6.发送消息
channel.basicPublish("", "work_queues", null, body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}消费者(consumer)
消费者代码基本一样,复制即可
1 | package com.demo; |
1 | package com.demo; |
运行结果
RabbitMQ管理界面(生产者运行完毕)
xxxxxxxxxx rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmqrm -rf /usr/local/erlangrm -rf /usr/local/rabbitmqshell
- 消费者1
- 消费者2
RabbitMQ管理界面最终结果
小结
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
- Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个, 只需要有一个节点成功发送即可。
Pub/Sub
模式说明
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化
生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
Queue消息队列,接收消息、缓存消息
交换机(X)。
一方面,接收生产者发送的消息。
另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。
到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型:
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合 路由规则的队列,那么消息会丢失!
代码编写
生产者(producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71package com.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.184.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
/*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* 参数:
* 1. exchange:交换机名称
* 2. type:交换机类型
* DIRECT("direct") 定向
FANOUT("fanout") 扇形(广播),发送消息到每一个与之绑定的队列
TOPIC("topic") 通配符
HEADERS("headers") 参数匹配
* 3. durable:是否持久化
* 4. autoDelete:是否自动删除
* 5. internal:内部使用,一般false
* 6. arguments:参数
* */
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
//6.创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//7.绑定队列和交换机
/*
* queueBind(String queue, String exchange, String routingKey)
* 参数:
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
* */
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
//8.发送消息
String body = "logMessage: 张三调用了findAll()方法...level:info";
channel.basicPublish(exchangeName, "", null, body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}消费者(consumer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59package com.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSub_Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.106.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//接收消息
/*
* public String basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1.queue:队列名称
* 2.autoAck:是否自动确认
* 3.callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
*回调方法,当收到消息后会自动执行该方法
* 参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息、交换机、路由key。。。
* 3.properties:配置信息
* 4.body:数据
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:" + consumerTag);
// System.out.println("envelope:" + envelope.getExchange());
// System.out.println("RoutingKey:" + envelope.getRoutingKey());
// System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
System.out.println("输出该消息");
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59package com.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSub_Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.106.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
//接收消息
/*
* public String basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数:
* 1.queue:队列名称
* 2.autoAck:是否自动确认
* 3.callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
/*
*回调方法,当收到消息后会自动执行该方法
* 参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息、交换机、路由key。。。
* 3.properties:配置信息
* 4.body:数据
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:" + consumerTag);
// System.out.println("envelope:" + envelope.getExchange());
// System.out.println("RoutingKey:" + envelope.getRoutingKey());
// System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
System.out.println("持久化该消息");
}
};
channel.basicConsume(queue2Name, true, consumer);
}
}
运行结果
RabbitMQ管理界面(生产者运行完毕)
消费者运行结果
RabbitMQ管理界面最终结果
小结
- 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
- 发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队 列绑 定到默认的交换机
Routing
模式说明
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
消费者,其所在队列指定了需要 routing key 为 error 的消息
消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
代码编写
生产者(producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74package com.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.184.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
/*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* 参数:
* 1. exchange:交换机名称
* 2. type:交换机类型
* DIRECT("direct") 定向
FANOUT("fanout") 扇形(广播),发送消息到每一个与之绑定的队列
TOPIC("topic") 通配符
HEADERS("headers") 参数匹配
* 3. durable:是否持久化
* 4. autoDelete:是否自动删除
* 5. internal:内部使用,一般false
* 6. arguments:参数
* */
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
//6.创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//7.绑定队列和交换机
/*
* queueBind(String queue, String exchange, String routingKey)
* 参数:
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
* */
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
//8.发送消息
String body = "logMessage: 张三调用了findAll()方法...level:info";
channel.basicPublish(exchangeName, "error", null, body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
- 消费者(consumer)
1 | package com.demo; |
1 | package com.demo; |
运行结果
channel.basicPublish(exchangeName, “info”, null, body.getBytes());
消息routing为 info 的情况下(消费者2可接收),具体代码请看生产者类里写的绑定规则
RabbitMQ管理界面(生产者运行完毕)
消费者运行结果
RabbitMQ管理界面最终结果
channel.basicPublish(exchangeName, “error”, null, body.getBytes());
消息routing为 error 的情况下(消费者1和2都可接收),具体代码请看生产者类里写的绑定规则
RabbitMQ管理界面(生产者运行完毕)
消费者运行结果
RabbitMQ管理界面最终结果
小结
- Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
Topics
模式说明
- Q1:绑定 #.error和order.*,因此凡是以 order. 开头和以.error结尾的 routing key 都会被匹配到
- Q2:绑定#.#(*和#是一样的效果),因此什么格式的 routing key 都会被匹配
代码编写
想匹配其他规则只需要更改生产者代码中的
//队列1需求:所有error级别、order系统的消息进行持久化
channel.queueBind(queue1Name, exchangeName, “#.error”);
channel.queueBind(queue1Name, exchangeName, “order.“);
//队列2需求:所有消息输出到控制台
channel.queueBind(queue2Name, exchangeName, “.*”);
生产者(producer)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74package com.demo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("192.168.184.128");//ip 默认localhost
factory.setPort(5672);//端口 默认值5672
factory.setVirtualHost("/centos7");//虚拟机 默认为/
factory.setUsername("root");//用户名 默认guest
factory.setPassword("root");//密码 默认guest
//3.创建连接 Connection
Connection connection = factory.newConnection();
//4.创建Channel
Channel channel = connection.createChannel();
//5.创建交换机
/*
* exchangeDeclare(String exchange, BuiltinExchangeType type,
* boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* 参数:
* 1. exchange:交换机名称
* 2. type:交换机类型
* DIRECT("direct") 定向
FANOUT("fanout") 扇形(广播),发送消息到每一个与之绑定的队列
TOPIC("topic") 通配符
HEADERS("headers") 参数匹配
* 3. durable:是否持久化
* 4. autoDelete:是否自动删除
* 5. internal:内部使用,一般false
* 6. arguments:参数
* */
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
//6.创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
//7.绑定队列和交换机
/*
* queueBind(String queue, String exchange, String routingKey)
* 参数:
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
* */
//队列1需求:所有error级别、order系统的消息进行初九话
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
//队列2需求:所有消息打赢到控制台
channel.queueBind(queue2Name, exchangeName, "*.*");
//8.发送消息
String body = "logMessage: 张三调用了findAll()方法...level:a";
channel.basicPublish(exchangeName, "a.error", null, body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}消费者(consumer)
1 | package com.demo; |
1 | package com.demo; |
运行结果
RabbitMQ管理界面(生产者运行完毕)
消费者运行结果
RabbitMQ管理界面最终结果
小结
- Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
工作模式总结
- 简单模式 HelloWorld
- 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。
- 工作队列模式 Work Queue
- 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。
- 发布订阅模式 Publish/subscribe
- 需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消 息发送到绑定的队列。
- 路由模式 Routing
- 需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机 后,交换机会根据 routing key 将消息发送到对应的队列。
- 通配符模式 Topic
- 需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送 消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。