消息模型
- 点对点
消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 - 发布/订阅
消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。
临时(ephemeral)订阅:这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出,相应的订阅以及尚未处理的消息就会丢失。
持久(durable)订阅:这种订阅会一直存在,除非主动去删除。消费者退出后,消息系统会继续维护该订阅,并且后续消息可以被继续处理。 - 发布与订阅模式和观察者模式有以下不同:
观察者模式中,观察者和主题都知道对方的存在;而在发布与订阅模式中,生产者与消费者不知道对方的存在,它们之间通过频道进行通信。
观察者模式是同步的,当事件触发时,主题会调用观察者的方法,然后等待方法返回;而发布与订阅模式是异步的,生产者向频道发送一个消息之后,就不需要关心消费者何时去订阅这个消息,可以立即返回。
AMQP协议
AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
- Server:接收客户端的连接,实现AMQP实体服务。
- Connection:连接,应用程序与Server的网络连接,TCP连接。
(1)Connection会执行认证、IP解析、路由等底层网络任务。
(2)应用与消息队列RabbitMQ版完成Connection建立大约需要15个TCP报文交互,因而会消耗大量的网络资源和消息队列RabbitMQ版资源。
(3)一个进程对应一个Connection,一个进程中的多个线程则分别对应一个Connection中的多个Channel。
(4)Producer和Consumer分别使用不同的Connection进行消息发送和消费; Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
(1)Channel是物理TCP连接中的虚拟连接。
(2)当应用通过Connection与消息队列RabbitMQ版建立连接后,所有的AMQP协议操作(例如创建队列、发送消息、接收消息等)都会通过Connection中的Channel完成。
(3) Channel可以复用Connection,即一个Connection下可以建立多个Channel。
(4) Channel不能脱离Connection独立存在,而必须存活在Connection中。
(5) 当某个Connection断开时,该Connection下的所有Channel都会断开。Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。由Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
- Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
- Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
- Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
- RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
- Queue:消息队列,用来保存消息,供消费者消费。
工作原理
生产者发送消息流程
1、生产者和Broker建立TCP连接;
2、生产者和Broker建立通道;
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发;
4、Exchange将消息转发到指定的Queue(队列)。
【详细】
1、消息生产者连接到RabbitMQ Broker
,建立链接(Connection),在链接(Connection)上开启一个信道(Channel);
2、声明一个交换机(Exchange),并设置相关属性,比如交换机类型、是否持久化等;
3、声明一个队列(Queue),并设置相关属性,比如是否排他、是否持久化、是否自动删除等;
4、使用路由键(RoutingKey)将队列(Queue)和交换机(Exchange)绑定起来;
5、生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息,根据路由键(RoutingKey)发送消息到交换机(Exchange);
6、相应的交换器(Exchange)根据接收到的路由键(RoutingKey)查找相匹配的队列如果找到 ,则将从生产者发送过来的消息存入相应的队列中;
7、如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者;
8、关闭信道(Channel);
9、关闭链接(Connection);
消费者接收消息流程
1、消费者和Broker建立TCP连接;
2、消费者和Broker建立通道;
3、消费者监听指定的Queue(队列);
4、当有消息到达Queue时Broker默认将消息推送给消费者;
5、消费者接收到消息;
6、ack回复。
【详细】
1、建立链接(Connection);
2、在链接(Connection)上开启一个信道(Channel);
3、请求消费指定队列(Queue)的消息,并设置回调函数(onMessage);
4、[MQ]将消息推送给消费者,消费者接收消息;
5、消费者发送消息确定(Ack[acknowledge]);
6、[MQ]删除被确认的消息;
7、关闭信道(Channel);
8、关闭链接(Connection);
特性分析
不仅需要知道Rabbit的特性,还需要知道支持这些特性的原因:
- 消息路由(支持):RabbitMQ可以通过不同的交换器支持不同种类的消息路由;
- 消息有序(不支持):当消费消息时,如果消费失败,消息会被放回队列,然后重新消费,这样会导致消息无序;
- 消息时序(非常好):通过延时队列,可以指定消息的延时时间,过期时间TTL等;
- 容错处理(非常好):通过交付重试和死信交换器(DLX)来处理消息处理故障;
- 伸缩(一般):伸缩其实没有非常智能,因为即使伸缩了,master queue还是只有一个,负载还是只有这一个master queue去抗,所以我理解RabbitMQ的伸缩很弱(个人理解)。
- 持久化(不太好):没有消费的消息,可以支持持久化,这个是为了保证机器宕机时消息可以恢复,但是消费过的消息,就会被马上删除,因为RabbitMQ设计时,就不是为了去存储历史数据的。
- 消息回溯(不支持):因为消息不支持永久保存,所以自然就不支持回溯。
- 高吞吐(中等):因为所有的请求的执行,最后都是在master queue,它的这个设计,导致单机性能达不到十万级的标准。
适用场景
异步处理
发送者将消息发送给消息队列之后,不需要同步等待消息接收者处理完毕,而是立即返回进行其它操作。消息接收者从消息队列中订阅消息之后异步处理。
流量削锋
在高并发的场景下,如果短时间有大量的请求到达会压垮服务器。
可以将请求发送到消息队列中,服务器按照其处理能力从消息队列中订阅消息进行处理。
应用解耦
如果模块之间不直接进行调用,模块之间耦合度就会很低,那么修改一个模块或者新增一个模块对其它模块的影响会很小,从而实现可扩展性。
通过使用消息队列,一个模块只需要向消息队列中发送消息,其它模块可以选择性地从消息队列中订阅消息从而完成调用。
日志处理
日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。
日志采集客户端,负责日志数据采集,定时写入Kafka队列;Kafka消息队列,负责日志数据的接收,存储和转发;日志处理应用:订阅并消费kafka队列中的日志数据。
消息通讯
消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等