RocketMQ 架構簡析
Apache RocketMQ 是阿里開源的一款高性能、高吞吐量的分布式消息中間件。
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于集群中的不同的Broker Group。 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Namesrv 說道Namesrv首先會想到服務注冊與發現。分布式服務SOA架構體系中會有服務注冊與發現中心。主要作用是指導服務調用方找到服務提供者提供的服務實例。RocketMQ體系中Namesrv主要作用是:為producer和consumer提供關于topic的路由信息。管理broker節點:監控更新broker的實時狀態。路由注冊、路由刪除(故障剔除)。 Namesrv充當路由消息的提供者。Namesrv是一個幾乎無狀態節點,多個Namesrv實例組成集群,但相互獨立,沒有信息交換。 消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。 Broker是以group為單位提供服務。一個group里面分Master和Slave。Master和Slave存儲的數據一樣,slave從master同步數據(同步雙寫或異步復制看配置)。一個Master可以對應多個Slave,一個Slave只能對應一個Master。Master與Slave的對應關系通過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。broker不必須是物理機或虛擬機: 每個Broker與Namesrv集群中的所有節點建立長連接,定時發送心跳包到所有Namesrv,更新broker信息、topic路由信息等。一個Topic的不同queue(分區)可分布到集群中不同的broker group上。 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Producer (消息)生產者。Producer與Namesrv集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務的broker master建立長連接,且定時向broker master發送心跳。Producer完全無狀態,可集群部署。 Producer負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Consumer (消息)消費者 Consumer與Namesrv集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。 Consumer負責消費消息,一般是后臺系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。 集群模式下:相同Consumer Group的每個Consumer實例平均分攤消息。一個條消息僅能被一個Consumer Group消費一次。、 消息中間件需要解決的問題:異步化、削峰填谷。 消息中間件應具備的基礎能力是:消息發布、訂閱、消費。概念相對簡單這里不過多描述。 消息中間件的一些重要的機制: 優先級是指在一個消息隊列中,每條消息都有不同的優先級,一般用整數來描述,優先級高的消息先投遞,如果消息完全在一個內存隊列中,那么在投遞前可以按照優先級排序,令優先級高的先投遞。由于RocketMQ所有消息都是持久化的,所以如果按照優先級來排序,開銷會非常大,因此RocketMQ沒有特意支持消息優先級,但是可以通過變通的方式實現類似功能,即單獨配置一個優先級高的隊列,和一個普通優先級的隊列,將不同優先級發送到不同隊列即可。 消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了3條消息,分別是訂單創建,訂單付款,訂單完成。消費時,要按照這個順序消費才能有意義。但是同時訂單之間是可以并行消費的。RocketMQ可以嚴格的保證消息有序。 RocketMQ、Kafka 以文件記錄形式持久化。 RocketMQ采用了單一的日志文件,即把同1個broker上面所有topic的所有queue的消息,存放在一個文件里面,從而避免了隨機的磁盤寫入。 如上圖所示,所有消息都存在一個單一的CommitLog文件里面,然后有后臺線程異步的同步到ConsumeQueue,再由Consumer進行消費。 TODO 同步、異步刷盤。 TODO RocketMQ充分利用Linux文件系統內存cache來提高性能。TODO CommitLog index Commitlog segment的大小與頁緩存一致。 RocketMQ消息存儲機制會在后面的文章詳細說明。 TODO broker group master/salve TODO Async/Sync Master; 提高并發效率 => 提高生產、消費并行度=>提高分區數量。 RocketMQ、kafka都支持topic數據分區存放、動態擴展。 以RocketMQ為例: topic創建的時候可以用集群模式去創建(這樣集群里面每個broker的queue的數量相同),也可以用單個broker模式去創建(這樣每個broker的queue數量可以不一致)。 RocketMQ的生產并行度是由其自身機制及broker的數量決定的。這塊后面的文章會詳細分析。 廣播模式下所有消費者會接受并消費當前topic下所有Queue的消息。 集群模式下,一個queue只分配給一個consumer實例:這是由于拉取消息是consumer主動控制的,如果多個實例同時消費一個queue的消息,會導致同一個消息在不同的實例下被消費多次,所以算法上都是一個queue只分給一個consumer實例,一個consumer實例可以允許同時分到不同的queue。 Kafka的消費并行度依賴Topic配置的分區數,如分區數為10,那么最多10臺機器來并行消費(每臺機器只能開啟一個線程),或者一臺機器消費(10個線程并行消費)。即消費并行度和分區數一致。RocketMQ消費并行度分兩種情況:順序消費方式并行度同卡夫卡完全一致;亂序方式并行度取決于Consumer的線程數,如Topic配置10個隊列,10臺機器消費,每臺機器100個線程,那么并行度為1000。 Producer使用MessageQueueSelector選擇將消息投放到哪個分區 使用AllocateMessageQueueStrategy將不同分區分配給Consumer Group中的不同Consumer。一個分區(queue)僅允許分配給同一個Consumer Group下的一個Consumer(防止重復消費)。 內置實現類:SelectMessageQueueByMachineRoom SelectMessageQueueByHash SelectMessageQueueByRandom 可以通過實現MessageQueueSelector接口,來自定義Producer投遞消息時選擇分區的算法。 內置實現類: 可以通過實現AllocateMessageQueueStrategy來自定義queue 分配給特定Consumer Group下不同Consumer的策略。 https://github.com/apache/rocketmq/blob/master/docs/cn/ https://juejin.im/post/6844903589819875336 https://jaskey.github.io/blog/2016/12/19/rocketmq-rebalance/ http://objcoding.com/2019/09/13/kafka-partition-and-rmq-queue/ http://www.itmuch.com/books/rocketmq
整體架構
-
路由元信息
-
topicQueueTable:topic 消息隊列路由信息。 -
brokerAddrTable:broker基礎信息。包含broker name,所屬集群名稱,主broker地址等。 -
clusterAddrTable:broker集群信息,存儲集群中所有broker的名稱。 -
brokerLiveTable:broker狀態信息。 -
filterServerTable:broker上的filterServer列表。filterServer用于消息過濾。
-
路由注冊? RocketMQ路由注冊是通過broker與Namesrv的心跳功能實現的。broker啟動時向集群中所有Namesrv發送心跳包,之后每隔30秒向集群中所有Namesrv發送心跳包。心跳包中包含:broker集群信息、broker信息、topic配置信息、broker關聯的FilterServer列表等。如果brokerA為Master。并且brokerA上的topic1的配置信息發生變化或初次注冊,Namesrv會根據報文創建或更新Topic路由元數據,填充topicQueueTable。 -
路由刪除? Namesrv收到brokerA的心跳包會更新brokerLiveTable中的brokerA對應的BrokerLiveInfo中的lastUpdateTimestamp。Namesrv每隔10秒掃描brokerLiveTable一次。如果brokerA對應的BrokerLiveInfo 中 lastUpdateTimestamp距當前時間超過 120秒,Namesrv認為brokerA失效,會將brokerA的路由信息移除并關閉與broker的socket連接。更新:topicQueueInfo、brokerAddrTable、brokerLiveTable、filterServerTable等。 -
路由發現? RocketMQ路由發現是非實時的。當Topic路由信息發生變化是,Namesrv不會主動推送給客戶端(Producer、Consumer)。而是由客戶端定時到Namesrv拉去最新的路由信息并緩存(包含Topic路由信息)。
與kafka對比
kafka 由zookeeper集群提供命名服務(Naming Service)。
Kafka通過 ZooKeeper 管理集群配置、選舉 Leader 以及在 consumer g
Broker
與kafka對比:
kafka和RocketMQ的broker都可以容納多個一個或多個分區數據(kafka分區:partition;RocketMQ分區:queue)。
kafka基于partition(分區) 做備份/高可用(partition follower)。
RocketMQ增加了broker group的概念,基于broker(可能包含多個分區)。
Producer、Consumer都只需要和集群中一個Namesrv建立長連接。Broker需要向集群中所有的Namesrv發送心跳包。
其實很好理解:
Namesrv集群提供高可用的命名服務。
Producer、Consumer只需要從其中一臺定期同步路由信息。
如果Broker只隨機調一臺發送心跳包。那么不同的Namesrv保存的路由信息會出現
消費者類型:
-
拉取式消費(Pull Consumer) Consumer消費的一種類型,應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。Pull方式里,取消息的過程需要用戶自己寫(包括提交offset等操作)。 -
推動式消費(Push Consumer) Consumer消費的一種類型,該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。Push Consumer原理上也是采取pull模式。實際上就是長輪詢的pull模式。
一些概念
-
主題(Topic) 表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。每個topic可分為若干個分區(queue)。 -
生產者組(Producer Group) 同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。 -
消費者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。 -
普通順序消息(Normal Ordered Message) 普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。 -
嚴格順序消息(Strictly Ordered Message) 嚴格順序消息模式下,消費者收到的所有消息均是有順序的。 -
消息(Message) 消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。 -
標簽(Tag) 為消息設置的標志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
關于消息中間件
1. 消息優先級(Message Priority;RocketMQ不支持)
2. 順序消息(Message Order)
-
投遞消息的順序性:投遞消息的順序性可通過將一組消息投遞到同一分區實現。例如:借助MessageQueueSelector將對相同訂單的操作消息投放到同一分區。 -
消費消息的順序性:RoctetMQ特性保障:特定分區(queue)中的消息不能同時被同一個消費者組中的多個Consumer消費,以避免重復消費。通過自定義或使用預置的AllocateQueueStrategy可設定分區的分配策略(哪些分區分配給哪個消費者消費)。
3. 高可用、消息可靠性
3.1 消息持久化
3.2 broker master/salve
4. 高并發、可擴展 ==> 分布式
4.1 生產并行度
4.2 消費并行度
4.3 消息隊列分配策略
MessageQueueSelector
AllocateMessageQueueStrategy
AllocateMessageQueueAveragely:平均分配算法? AllocateMessageQueueAveragelyByCircle:基于環形平均分配算法 AllocateMachineRoomNearby:基于機房臨近原則算法 AllocateMessageQueueByMachineRoom:基于機房分配算法 AllocateMessageQueueConsistentHash:基于一致性hash算法 AllocateMessageQueueByConfig:基于配置分配算法
參考:
作者:RyanLee86799來源:https://juejin.im/post/6844904130822029320 文章轉載:JAVA高級架構
(版權歸原作者所有,侵刪)