其他RabbitMQ文章

MQ 的基本概念

MQ概述

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进 行通信。

分布式正常通信:

使用MQ:

MQ,消息队列,存储消息的中间件

分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信

发送方称为生产者,接收方称为消费者

MQ 的优势和劣势

应用解耦:提高系统容错性和可维护性

异步提速:提升用户体验和系统吞吐量

削峰填谷:提高系统稳定性

系统可用性降低:系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高:MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何 保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

一致性问题:A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理 失败。如何保证消息数据处理的一致性?

既然 MQ 有优势也有劣势,那么使用 MQ 需要满足什么条件呢?

  1. 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明 明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
  2. 容许短暂的不一致性。
  3. 确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本

RabbitMQ 简介

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。

基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。

RabbitMQ 采用 Erlang 语言开发。

Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

RabbitMQ 基础架构如下图:

RabbitMQ 中的相关概念:

  • Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多 个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

  • Connection:publisher/consumer 和 broker 之间的 TCP 连接

  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

  • xxxxxxxxxx rm -rf /usr/lib64/erlang rm -rf /var/lib/rabbitmqrm -rf /usr/local/erlangrm -rf /usr/local/rabbitmqshell
  • Queue:消息最终被送到这里等待 consumer 取走
  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ 提供了 6 种工作模式:

查看工作模式

JMS

  1. JMS 即 Java 消息服务(JavaMessage Service)应用程序接口,是一个 Java 平台中关于面向消息中间件 的API
  2. JMS 是 JavaEE 规范中的一种,类比JDBC
  3. 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ 官方没有提供 JMS 的实现包,但是开 源社区有

总的来说

  1. RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
  2. RabbitMQ提供了6种工作模式,我们学习5种。这是今天的重点。
  3. AMQP 是协议,类比HTTP。
  4. JMS 是 API 规范接口,类比 JDBC。

RabbitMQ的安装和配置

简单模式入门案例

使用如下简单模式

在上图的模型中,有以下概念:

生产者,也就是要发送消息的程序

消费者:消息的接收者,会一直等待消息到来

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

记得启动RabbitMQ(本案例在虚拟机linux下启动的)~~

实现步骤

实现producer

  1. 在idea工具中新建maven项目,并导入以下依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
    </dependency>
  2. 编写代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class hello {

    public static void main(String[] args) throws IOException, TimeoutException {

    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置参数
    factory.setHost("192.168.84.128");//ip 默认localhost
    factory.setPort(5672);//端口 默认值5672
    factory.setVirtualHost("/centos7");//虚拟机 默认为/
    factory.setUsername("root");//用户名 默认guest
    factory.setPassword("root");//密码 默认guest
    //3.创建连接 Connection
    Connection connection = factory.newConnection();
    //4.创建Channel
    Channel channel = connection.createChannel();
    //5.创建队列Queue
    /*
    * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    * 参数:
    * 1.queue:队列名称
    * 2.durable:是否持久化,当mq重启之后还存在
    * 3.exclusive:
    * -是否独占。只能有一个消费者监听这队列
    * -当Connection关闭时,是否删除队列
    * 4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
    * 5.arguments:参数
    * */
    //如果没有一个名字叫hello_world的队列,则会自动创建该队列,如果有则不会创建
    channel.queueDeclare("hello_world", true, false, false, null);
    /*
    * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
    * 参数:
    * 1.exchange:交换机名称。简单模式下交换机会使用默认的 ""
    * 2.routingKey:路由名称
    * 3.props:配置信息
    * 4.body:发送消息数据
    * */
    String body = "hello rabbitmq---";
    //6.发送消息
    channel.basicPublish("", "hello_world", null, body.getBytes());
    //7.释放资源
    channel.close();
    connection.close();
    }

    }
  3. 运行后可在浏览器的rabbitmq管理界面看到所发送给mq的消息

实现consumer

  1. 在idea工具中新建maven项目,并导入以下依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
    </dependency>
  2. 编写代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class hello {

    public static void main(String[] args) throws IOException, TimeoutException {

    //1.创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    //2.设置参数
    factory.setHost("192.168.84.128");//ip 默认localhost
    factory.setPort(5672);//端口 默认值5672
    factory.setVirtualHost("/centos7");//虚拟机 默认为/
    factory.setUsername("root");//用户名 默认guest
    factory.setPassword("root");//密码 默认guest
    //3.创建连接 Connection
    Connection connection = factory.newConnection();
    //4.创建Channel
    Channel channel = connection.createChannel();
    //5.创建队列Queue
    /*
    * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
    * 参数:
    * 1.queue:队列名称
    * 2.durable:是否持久化,当mq重启之后还存在
    * 3.exclusive:
    * -是否独占。只能有一个消费者监听这队列
    * -当Connection关闭时,是否删除队列
    * 4.autoDelete:是否自动删除,当没有Consumer时,自动删除掉
    * 5.arguments:参数
    * */
    //如果没有一个名字叫hello_world的队列,则会自动创建该队列,如果有则不会创建
    channel.queueDeclare("hello_world", true, false, false, null);

    //接收消息
    /*
    * public String basicConsume(String queue, boolean autoAck, Consumer callback)
    * 参数:
    * 1.queue:队列名称
    * 2.autoAck:是否自动确认
    * 3.callback:回调对象
    * */
    Consumer consumer = new DefaultConsumer(channel){
    /*
    *回调方法,当收到消息后会自动执行该方法
    * 参数:
    * 1.consumerTag:标识
    * 2.envelope:获取一些信息、交换机、路由key。。。
    * 3.properties:配置信息
    * 4.body:数据
    */
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("consumerTag:" + consumerTag);
    System.out.println("envelope:" + envelope.getExchange());
    System.out.println("RoutingKey:" + envelope.getRoutingKey());
    System.out.println("properties:" + properties);
    System.out.println("body:" + new String(body));
    }
    };
    channel.basicConsume("hello_world", true, consumer);
    }

    }
  3. 运行即可消费rabbitmq队列里的消息