大家好,我是三此君,一个在自我救赎之路上的非典型程序员。
“一张图”系列旨在通过“一张图”系统性的解析一个板块的知识点:
本文是“一张图”系列的第一个板块:一张图解析 RocketMQ。
本文是《一张图解析 RocketMQ》系列的第 1 篇,今天的内容主要分为三个部分:
什么是消息队列?顾名思义,首先得有一个队列,这个队列用来存储消息。那有了消息队列就得有人往里面放,有人往里面取。有没有似曾相识燕归来的感 jio,这莫非就是连小学生都知道的,经典的“生产者-消费者模式”?接下来我们就来看看它里面穿了什么?
别急,先来回顾一下 “生产者-消费者模式” 这个老朋友。简单来说,这个模型是由两类线程和一个队列构成:
有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。
这意味着什么呢,生产者和消费者之间实现解藕和异步。这就厉害了,因为我们生活中很多都是异步的。比如最近新冠疫情卷土重来,我点的外卖只能送到小区门口的外卖队列里面,而我只能去外卖队列里面取外卖,然后一顿狼吞虎咽。
具体 “生产者-消费者模式” 怎么实现,想必各位小学都学过了,我们来看看这个模式还有什么问题吧。最大的问题就是我们小学学的 “生产者-消费者模式” 是个单机版的,只能自嗨。这就相当于,我就是外卖骑手,我点了个外卖放到外卖队列,然后我再从外卖队列里面去取,一顿操作猛如虎呀!于是就有了进化版,我们把消费者,队列,生产者放到不同的服务器上,这就是传说中的分布式消息队列了。
生产者生产的消息通过网络传递给队列存储,消费者通过网络从队列获取消息。但是还有问题,消息可能有很多种,全都放在一起岂不是乱套了?我点的外卖和快递全都放在一起,太难找了吧。于是我们就需要区分不同类型消息,相同类型的消息称为一个 Topic。同时,骑手不可能只有一个,点外卖的也不会只有我一个人,于是就有了生产者组和消费者组。
但还是有问题呀,小区那么大,一个队列放不下。我住在小区南门,点个外卖还要跑去北门拿,那真的是 eggs hurt
。于是物业在东南西北门各设了一个外卖快递放置点。也就是我们有多个队列,组成 队列集群。
可是,问题又双叒叕来了(还有完没完),一个小区那么多个外卖快递队列,骑手怎么知道送到哪里去,我又怎么知道去哪里取?很简单,导航呀。我们把导航的信息称为路由信息,这些信息需要有一个管理的地方,它告诉生产者,某这个 Topic 的消息可以发给哪些队列,同时告诉消费者你需要的消息可以从哪些队列里面取。RocketMQ 为这些路由信息的设置了管理员 NameServer,当然 NameServer 也可以有很多个,组成 NameServer 集群。
到这里,你就应该知道 RocketMQ 里面都穿了什么啦。包括了生产者(Producer),消费者(Consumer),NameServer 以及队列本身(Broker)。Broker 是代理的意思,负责队列的存取等操作,我们可以把 Broker 理解为队列本身。
NameServer:我们可以同时部署很多台 NameServer 服务器,并且这些服务器是无状态的,节点之间无任何信息同步。
NameServer 起来后监听 端口,等待 Broker、Producer、Consumer 连上来,NameServer 是集群元数据管理中心。
Broker:Broker 启动,跟所有的 NameServer 保持长连接,每 30s 发送一次发送心跳包(像心跳一样持续稳定的发送请求)。心跳包中包含当前 Broker 信息 ( IP+ 端口等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
我们可以同时部署多个 Master 和多个 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的需要有相同的 BrokerName,不同的 BrokerId 。BrokerId 为 0 表示 Master,非 0 表示 Slave,但只有 BrokerId=1 的从服务器才会参与消息的读负载。(可以暂时忽略 Broker 的主从角色)
Topic:收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
Producer:Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,采用轮询策略从选择一个队列,然后与队列所在的 Broker 建立长连接,并向 Broker 发消息。
Consumer:Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。
我们刚刚提到骑手不止一个,取外卖快递的也不止我一个,所以会有生产者组合消费者组的概念。这里需要补充说明一下,消息分为集群消息和广播消息:
集群消息:一个 Topic 的一条消息,一个消费者组只能有一个消费者实例消费。例如,同样是外卖 Topic,一份外卖,我们整个小区也只有一个人消费,就是集群消费。
广播消息:一个 Topic 的一条消息,一个消费者组所有消费者实例都会消费。例如,如果是因为疫情,政府发放食品,那我们小区每个人都会消费,就是广播消费。
因为 Producer、Consumer 和 Broker 都需要和 NameServer 交互,负责的三此君不得不先和大家唠唠 NameServer 是何方神圣。上面有说道 NameServer 是集群的元数据管理中心,那它到底管理了哪些元数据?我们来看看 NameServer 里面又穿了什么,看完了记得关注、转发、点赞、收藏哦。
简单来说,NameServer 是我们的整个 RocketMQ 集群的元数据管理中心,负责集群元数据的增删改查。先不管这个增删改查是怎么实现的,我们甚至可以理解就是数据库的增删改查,但是我们一定要知道这些元数据都长什么样子。才能知道 Producer、Consumer 及 Broker 是如何根据这些数据进行消息收发的。
如图所示,二主二从的 Broker 集群相关的元数据信息,包括 topicQueueTable、BrokerAddrTable、ClusterAddrTable、brokerLiveInfo、FilterServer (暂时不用关注,图中未画出)。
HashMap<String topic, List<QueueData>> topicQueueTable
:Key 是 Topic 的名称,它存储了所有 Topic 的属性信息。Value 是个 QueueData 列表,长度等于这个 Topic 数据存储的 Master Broker 的个数,QueueData 里存储着 Broker 的名称、读写 queue 的数量、同步标识等。HashMap<String BrokerName, BrokerData> brokerAddrTable
:这个结构存储着一个 BrokerName 对应的属性信息,包括所属的 Cluster 名称,一个 Master Broker 和多个 Slave Broker 的地址信息HashMap<String ClusterName, Set<String BrokerName>> clusterAddrTable
:存储的是集群中 Cluster 的信息,就是一个 Cluster 名称对应一个由 BrokerName 组成的集合。HashMap<String BrokerAddr, BrokerLiveInfo> brokerLiveTable
:Key 是 BrokerAddr 对应着一台机器,BrokerLiveTable 存储的内容是这台 Broker 机器的实时状态,包括上次更新状态的时间戳,NameServer 会定期检查这个时间戳,超时没有更新就认为这个 Broker 无效了,将其从 Broker 列表里清除。HashMap<String BrokerAddr, List<String> FilterServer> filterServerTable
:Key 是 Broker 的地址,Value 是和这个 Broker 关联的多个 FilterServer 的地址。Filter Server 是过滤服务器,是 RocketMQ 的一种服务端过滤方式,一个 Broker 可以有一个或多个 Filter Server。其他角色会主动向 NameServer 上报状态,根据上报消息里的请求码做相应的处理,更新存储的对应信息。
那么多数据,相信大家看的有点晕,三此君简单总结下:NameServer 通过 brokerLiveInfo 来维护存活的 Broker。Producer 会获取上面的路由信息,发送消息的时候指定发送到哪个 Topic,根据 Topic 可以从 topicQueueTable 选择一个 Broker,根据 BrokerName 可以从 BrokerAddrTable 获取到Broker IP 地址。有了地址 Producer 就可以将消息通过网络传递给 Broker。
刚刚我们了解 RocketMQ 整体架构,那怎么样通过 RocketMQ 收发消息呢?需要先通过 Docker 部署一套 RocketMQ:
如果你没有安装 Docker,可以根据菜鸟教程 MacOS Docker 安装/Windows Docker 安装 进行安装。然后,通过 docker-compose 部署 RocketMQ:
broker.conf
中的brokerIP1
参数为本机 IP;docker-compose.yml
文件所在路径,执行docker-compose up
命令即可;注意:如果你现在不了解 Docker 不重要,只需要按照步骤部署好 RocketMQ 即可,并不会阻碍我们理解 RocketMQ 相关内容。
部署完成后我们就可以在 Docker Dashboard 中看到 RocketMQ 相关容器,包括 Broker、NameServer 及 Console(RocketMQ 控制台),到这里我们就可以使用部署的 RocketMQ 收发消息了。
RocketMQ 已经部署好了,接下来先来看一个简单的消息收发示例,可以说是 RocketMQ 的 "Hello World"。
public class SyncProducer { public static void main(String[] args) throws Exception { // 实例化消息生产者Producer DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // 设置NameServer的地址 producer.setNamesrvAddr("localhost:9876"); // 启动Producer实例 producer.start(); // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("Topic1","Tag", "Key", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 System.out.printf("%s%n", sendResult); // 如果不再发送消息,关闭Producer实例。 producer.shutdown(); } }
producer
,并告诉它 NameServer 的地址,这样生产者才能从 NameServer 获取路由信息。producer
得做一些初始化(这是很关键的步骤),它要和 NameServer 通信,要先建立通信连接等。producer
已经准备好了,那得准备好要发的内容,把 "Hello World" 发送到 Topic1。producer
就可以把消息发送出去了。producer
怎么知道 Broker 地址呢?他就会去 NameServer 获取路由信息,得到 Broker 的地址是 localhost:10909,然后通过网络通信将消息发送给 Broker。public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 erbconsumerijun.subscribe("sancijun", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs,ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者实例 consumer.start(); } }
consumer
,告诉它 NameServer 的地址,这样消费者才能从 NameServer 获取路由信息。以上就是本文的全部内容,本文没有堆砌太多无意义的概念,没有讲什么削峰解耦,异步通信。这些内容网上也很多,看了和没看没什么两样。最后的最后,看懂的点赞,没看懂的收藏,顺便在分享给你的小伙伴。还没有关注的朋友记得关注,入股不亏。
丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.
转载请注明出处