當前位置:首頁 » 存儲配置 » rocketmq怎麼配置埠

rocketmq怎麼配置埠

發布時間: 2022-11-21 19:11:04

㈠ rocketmq 10911埠的ip怎麼修改

在你啟動的broker時,使用的配置文件中加listenPort=10911,將10911修改為你想要的埠號就好了。

㈡ rocketmq的9876埠可以改嗎

㈢ RocketMQ ACL使用指南

ACL是access control list的簡稱,俗稱訪問控制列表。訪問控制,基本上會涉及到用戶、資源、許可權、角色等概念,那在RocketMQ中上述會對應哪些對象呢?

另外,RocketMQ還支持按照客戶端IP進行白名單設置。

在講解如何使用ACL之前,我們先簡單看一下RocketMQ ACL的請求流程:

對於上述具體的實現,將在後續文章中重點講解,本文的目的只是希望給讀者一個大概的了解。

acl默認的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目錄下。下面對其配置項一一介紹。

全局白名單,其類型為數組,即支持多個配置。其支持的配置格式如下:

配置用戶信息,該類型為數組類型。擁有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

登錄用戶名,長度必須大於6個字元。

登錄密碼。長度必須大於6個字元。

用戶級別的IP地址白名單。其類型為一個字元串,其配置規則與globalWhiteRemoteAddresses,但只能配置一條規則。

boolean類型,設置是否是admin。如下許可權只有admin=true時才有許可權執行。

默認topic許可權。該值默認為DENY(拒絕)。

默認消費組許可權,該值默認為DENY(拒絕),建議值為SUB。

設置topic的許可權。其類型為數組,其可選擇值在下節介紹。

設置消費組的許可權。其類型為數組,其可選擇值在下節介紹。可以為每一消費組配置不一樣的許可權。

上面定義了全局白名單、用戶級別的白名單,用戶級別的許可權,為了更好的配置ACL許可權規則,下面給出許可權匹配邏輯。

首先,需要在broker.conf文件中,增加參數aclEnable=true。並拷貝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目錄。

broker.conf的配置文件如下:

plain_acl.yml文件內容如下:

從上面的配置可知,用戶RocketMQ只能發送TopicTest的消息,其他topic無許可權發送;拒絕oms_consumer_group消費組的消息消費,其他消費組默認可消費。

運行效果如圖所示:

發現並不沒有消費消息,符合預期。

關於RocketMQ ACL的使用就介紹到這里了,下一篇將介紹RocketMQ ACL實現原理。

推薦閱讀:
1、 RocketMQ實戰:生產環境中,autoCreateTopicEnable為什麼不能設置為true

2、 RocketMQ 消息發送system busy、broker busy原因分析與解決方案

3、 RocketMQ HA機制(主從同步)

4、 RocketMQ事務消息實戰

㈣ 安裝部署RocketMQ集群(雙主雙從)

在 前面 ,我們介紹了如果快速安裝單個RocketMQ。快速安裝意味著這只是在測試環境下的小打小鬧,我們在單機安裝的基礎上,嘗試安裝RocketMQ集群。本次安裝為了方便,使用的是已經編譯好的二進制包進行安裝部署。

RocketMQ集群解決了單機版RocketMQ所存在的單點故障問題,並且還可以對RocketMQ性能進行橫向的拓展。
下圖是官網上的架構圖,可以看到RocketMQ分為四個部分:

其中,根據RocketMQ Broker的集群方式的不同,大概可以分為三種:

在配置文件所在目錄 conf 中,我們可以看到有三個文件夾:2m-noslave、2m-2s-async、2m-2s-sync。這三個目錄剛好對應上面提到的三種集群方式,裡麵包含了官方給的配置示例,我們待會會在這個基礎上修改。

下面我們將要部署雙master雙slave同步復制的RocketMQ集群,這里需要准備兩個虛擬機。

就這樣,rockermq就安裝好了,接下來我們要修改配置文件。

由於默認的數據和日誌存儲的位置是當前用戶的家目錄,我們還需要修改到 /data/rocketmq目錄下:

日誌目錄的配置文件在 conf的幾個xml文件裡面:

最後,我們進入到 2m-2s-sync 目錄下,修改裡面的broker配置文件:

先啟動兩台機器的Nameserver

然後分別啟動4個Broker進程:

就這樣,RocketMQ雙主雙從的集群就已經搭建好了,通過rocketmq-console的監控頁面,可以看到如下的集群情況:

這些配置參數,在Broker 啟動的時候生效,如果啟動後有更改,要重啟Broker 。現在使用雲服務或多網卡的機器比較普遍, Broker 自動探測獲得的ip地址可能不符合要求,通過brokerIP1 =47 .98.41.234 這樣的配置參數,可以設置Broker 機器對外暴露的ip 地址。

㈤ RocketMQ(三)——系統架構

RocketMQ架構上主要分為四部分構成:

消息生產者,負責生產消息。Procer通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞,投遞的過程支持快速失敗並且低延遲

RocketMQ中的消息生產者都是以生產者組(Procer Group)的形式出現的。生產者組是同一類生產者的集合,這類Procer發送相同Topic類型的消息。一個生產者組可以同時發送多個主題的消息。

消息消費者,負責消費消息。一個消息消費者會從Broker伺服器中獲取到消息,並對消息進行相關業務處理

RocketMQ中的消息消費者都是以消費者組(Consumer Group)的形式出現的。消費者組是統一類消費者的集合,這類Consumer消費的是同一個Topic類型的消息。消費者組使得在消息消費方法,實現負載均衡(講一個Topic中不同的Queue平均分配給同一個Consumer Group的不同Consumer,並不是負載均衡)和容錯(一個Consumer掛了,該Consumer Group中的其他Consumer可以接著消費元Consumer消費的Queue)的目標變得非常容易

消費者組中Consumer的數量應小於等於Topic的Queue數量。如果超出Queue數量,則多出的Consumer將不能消費消息。

不過一個Topic類型的消息可以被多個消費者組同時消費。

NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態注冊與發現。
RocketMQ的思想來自於Kafuka,而Kafka是以來了Zookeeper的。所以,在RocketMQ的早期版本也依賴Zookeeper。從3.0開始去掉了Zookeeper的依賴,使用了自己的NameServer。

NameServer通常也是以集群的方式部署,不過,NameServer是無狀態的,即NameServer集群中的各個節點之間是無差異的,各個節點相互不進行信息通訊。那各個節點中的數據是如何進行數據同步的呢?在Broker節點啟動時,輪詢NameServer列表,與每個NameServer節點建立長連接,發起注冊請求。在NameServer內部維護者一個Broker列表,用來動態存儲Broker信息

Broker節點為了證明自己是活著的,為了維護與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發送一次心跳。心跳包中包含BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包後,會更新心跳時間戳,記錄這個Broker的最新存活時間。

由於Broker關機、宕機或網路抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除
NameServer中有一個定時任務,每隔10秒就會掃描一次Broker表,查看每一個Broker的最新心跳時間戳距離當前時間是否超過120秒,如果超過,則會判定Broekr失效,然後將其從Broker列表中剔除。

RocketMQ的路由發現採用的是Pull模型。當Topic路由信息出現變化時,NameServer不會主動推送給客戶端,而是客戶端定時拉取最新的路由。默認每30秒拉取一次最新的路由

客戶端再配置時必須要寫上NameServer集群的地址,那麼客戶端道理連接在哪個NameServer節點呢?客戶端首先會生產一個隨機數,然後再與NameServer節點數取模,此時得到的就是要連接的節點索引,然後就會進行連接。如果連接失敗,則會採用round-robin策略,逐個嘗試去連接其他節點。
首先採用的是 隨機策略 進行選擇,失敗後採用的是輪詢策略。

Broker充當著消息中轉角色,負責存儲消息、轉發消息。Broker在RocketMQ系統中負責接收並存儲從生產者發送來的消息,同時為消費者的拉取請求作準備。Broker同時也存儲著消息相關的元數據,包括消費者組、消費進度偏移offset、主題、隊列等

Remoting Mole :整個Broker的實體,負責處理來自clients端的請求。而這個Broker實體則由以下模塊構成。
Client Manager :客戶端管理器。負責接收、解析客戶端(Procer/Consumer)請求,管理客戶端。
Store Service :存儲服務。提供方便簡單的API介面,處理消息存儲到物理硬碟和消息查詢功能。
HA Service :高可用服務,提供Master Broker和Slave Broker之間的數據同步功能。
Index Service :索引服務。根據特定的Message Key,對投遞到Broker的消息進行索引服務,同時也提供根據Message Key對消息進行快速查詢的功能

為了增強Broker性能與吞吐量,Broker一般都是以集群形式出現的。各集群節點中可能存放著相同Topic的不同Queue。
如果某Broker節點宕機,如何保證數據不丟失呢?
其解決方案是,將每個Broekr集群節點進行橫向擴展,即將Broker節點再建為一個HA集群,解決單點問題。
Broker節點集群是一個主從集群,即有Master和Slave兩種角色。Master負責處理讀寫操作請求,Slave負責對Master中的數據進行備份。當Master掛掉了,Slaver會自動切換為Master去工作。所以這個Broker集群式主備集群。Master與Slave的對應關系是通過指定相同的BrokerName、不同的BrokerId來確定的。BrokerId為0表示Master,非0表示Slave。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。

①啟動NameServer,NameServer啟動後開始監聽埠,等待Broker、Procer、Consumer連接
②啟動Broker時,Broker會與所有的NameServer保持長連接,每30秒向NameServer定時發送心跳包
③發送消息前,可以先創建Topic ,創建Topic時需要指定該Topic要存儲在哪些Broker上,當然,在創建Topic時也會將Topic與Broker的關系寫入到NameServer中。也可以在發送消息時自動創建Topic。
④Procer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取路由信息,即當前發送Topic的Queue與Broker地址的映射關系。然後根據演算法策略從隊選擇一個Queue,與隊列所在的Broker建立長連接從而向Broker發送消息。
⑤Consumer與Procer類似,跟其中一台NameServer建立長連接,獲取其所訂閱Topic的路由信息,然後根據演算法策略從路由信息中獲取到其要消費的Queue,然後與Broker建立長連接,消費其中的消息。Consumer會向Broker發送心跳,以確保Broker的存活狀態

手動創建Topic時,有兩種模式:

自動創建Topic時,默認採用的是Broker模式,會為每個Broker默認創建四個Queue

從物理上講,讀/寫隊列是同一個隊列。所以,不存在讀/寫隊列數據同步問題。讀/寫隊列是邏輯上進行區分的概念 。一般來說,讀/寫隊列數量是相同的。

讀/寫隊列數量不同是有問題的。
但這樣可以方便縮容
perm用於設置對當前創建Topic的操作許可權:2表示只寫,4表示只讀,6表示讀寫

㈥ rocketmq總結以及自動化部署策略

是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式特點。

實例消費這個 Topic 對應的所有隊列,如果做集群消費,則多個 Consumer 實例平均消費這個 topic 對應的隊列集合。

RocketMQ 網路部署特點

1)高並發讀寫服務

Broker的高並發讀寫主要是依靠以下兩點:

2) 負載均衡與動態伸縮

負載均衡 :Broker上存Topic信息,Topic由多個隊列組成,隊列會平均分散在多個Broker上,而Procer的發送機制保證消息盡量平均分布到所有隊列中,最終效果就是所有消息都平均落在每個Broker上。

動態伸縮能力(非順序消息) :Broker的伸縮性體現在兩個維度:Topic, Broker。

3) 高可用&高可靠

高可用:集群部署時一般都為主備,備機實時從主機同步消息,如果其中一個主機宕機,備機提供消費服務,但不提供寫服務。

高可靠:所有發往broker的消息,有同步刷盤和非同步刷盤機制;同步刷盤時,消息寫入物理文件才會返回成功,非同步刷盤時,只有機器宕機,才會產生消息丟失,broker掛掉可能會發生,但是機器宕機崩潰是很少發生的,除非突然斷電

4)Broker與Namesrv的心跳機制
單個Broker跟所有Namesrv保持心跳請求,心跳間隔為30秒,心跳請求中包括當前Broker所有的Topic信息。Namesrv會反查Broer的心跳信息,如果某個Broker在2分鍾之內都沒有心跳,則認為該Broker下線,調整Topic跟Broker的對應關系。但此時Namesrv不會主動通知Procer、Consumer有Broker宕機。

消費者啟動時需要指定Namesrv地址,與其中一個Namesrv建立長連接。消費者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機,客戶端最多要30秒才能感知。連接建立後,從namesrv中獲取當前消費Topic所涉及的Broker,直連Broker。

Consumer跟Broker是長連接,會每隔30秒發心跳信息到Broker。Broker端每10秒檢查一次當前存活的Consumer,若發現某個Consumer 2分鍾內沒有心跳,就斷開與該Consumer的連接,並且向該消費組的其他實例發送通知,觸發該消費者集群的負載均衡。

消費者端的負載均衡
先討論消費者的消費模式,消費者有兩種模式消費:集群消費,廣播消費。

消費者端的負載均衡,就是集群消費模式下,同一個ID的所有消費者實例平均消費該Topic的所有隊列。

Procer啟動時,也需要指定Namesrv的地址,從Namesrv集群中選一台建立長連接。如果該Namesrv宕機,會自動連其他Namesrv。直到有可用的Namesrv為止。

生產者每30秒從Namesrv獲取Topic跟Broker的映射關系,更新到本地內存中。再跟Topic涉及的所有Broker建立長連接,每隔30秒發一次心跳。在Broker端也會每10秒掃描一次當前注冊的Procer,如果發現某個Procer超過2分鍾都沒有發心跳,則斷開連接。

生產者端的負載均衡

生產者發送時,會自動輪詢當前所有可發送的broker,一條消息發送成功,下次換另外一個broker發送,以達到消息平均落到所有的broker上。

這里需要注意一點:假如某個Broker宕機,意味生產者最長需要30秒才能感知到。在這期間會向宕機的Broker發送消息。當一條消息發送到某個Broker失敗後,會往該broker自動再重發2次,假如還是發送失敗,則拋出發送失敗異常。業務捕獲異常,重新發送即可。客戶端里會自動輪詢另外一個Broker重新發送,這個對於用戶是透明的。

綁定hosts或dns:

主機命名說明:

在實際應用中都會涉及多環境的問題,比如有線下環境(dev)和生產環境(prod),不同環境的應用最好保持配置一致,減少各個每個環境的配置工作量。

Rocketmq各環境統一連接地址

NAMESRV_ADDR="nameserver1.rocketmq.test.com:9876;nameserver2.rocketmq.test.com:9876"

根據Rocketmq集群說明,其實最終只需暴露nameserver的地址給應用即可,因此,各個環境綁定各個環境對應的hosts/dns即可使用統一連接的地址。

rocketmq各個組件都支持橫向擴容:

通過web可以查看集群狀態,查看topic信息以及創建更改topic,管理procer和consumer等。

用戶手冊: https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

㈦ Rocketmq的k8s配置(1nameservice + 1brocker)

RockerMQ在k8s的部署有兩種方式, 一種是使用operator 在k8s集群中部署,可參考 operation項目 ; 一種是編寫簡單的k8s配置文件,在rocketmq的docker項目中有提供模板。
這里我們希望使用單機版k8s部署一套低配置rockerMQ, 僅啟動一個nameservice和1個broker,我們將使用 rocketmq-docker項目 提供的模板來完成。

apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq
spec:
replicas: 1
selector:
matchLabels:
app: rocketmq
template:
metadata:
labels:
app: rocketmq
spec:
containers:
- name: broker
image: apacherocketmq/rocketmq:4.6.0
command: ["sh","mqbroker", "-n","localhost:9876"]
imagePullPolicy: IfNotPresent
ports:
- containerPort: 10909
- containerPort: 10911
env:
- name: JAVA_OPT
value: -server -XX:ParallelGCThreads=1
volumeMounts:
- mountPath: /home/rocketmq/logs
name: brokeroptlogs
- mountPath: /home/rocketmq/store
name: brokeroptstore
- name: namesrv
image: apacherocketmq/rocketmq:4.6.0
command: ["sh","mqnamesrv"]
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9876
volumeMounts:
- mountPath: /home/rocketmq/logs
name: namesrvoptlogs
volumes:
- name: brokeroptlogs
emptyDir: {}
- name: brokeroptstore
emptyDir: {}
- name: namesrvoptlogs
emptyDir: {}
- name: namesrvoptstore
emptyDir: {}

apiVersion: v1
kind: Service
metadata:
name: rocketmqservice
spec:
type: NodePort
ports:
- name: namesrv
port: 9876
targetPort: 9876
nodePort: 32000
selector:
app: rocketmq

notes: 簽名異常問題
Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available
手動方案I,在tool.sh 中${JAVA_HOME}/jre/lib/ext後加上ext文件夾的絕對路徑(jdk路徑)

最終方案: 手動的方式,很不方便,經過檢查,實際問題是由於路徑上的${JAVA_HOME}變數為空,導致無法找到etx路徑。所以,我們通過k8s的方式傳入JAVA_HOME環境便令就可以了。如下圖:

3.2 添加訂閱組
訂閱組 可以用來實現消費的loadbalance,同一訂閱組的消費者分享所有的讀隊列。
創建訂閱組使用updateSubGroup 命令,所需參數如下:

執行命令新建一個授權服務的消費組
./mqadmin updateSubGroup -b localhost:10911 -n localhost:9876 -g GID_authorize
執行結果:

㈧ RocketMQ系列:ACL機制

     ACL全稱access control list,俗稱訪問控制列表。主要包括如下角色

     首先Broker.conf文件配置 aclEnable=true ,然後需要將 plain_acl.yml 放在 ${ROCKETMQ_HOME}/conf目錄, plain_acl.yml

      服務端當配置好plain_acl.yml後並在 broker.conf中開啟 aclEnable=true ,服務端則會進行下面邏輯驗證

     在構造函數添加 RPCHook ,進行創建ACL對象實例。

     發送消息前置執行鉤子函數並驗證ACL許可權,若拋異常後則無法發送消息。

Broker端初始化ALC配置, 載入 AccessValidator配置
1. 核心是基於SPI機制,讀取META-INF/service/org.apache.rocketmq.acl.AccessValidator 訪問驗證器
2. 遍歷訪問驗證器,向Broker注冊鉤子函數。RPCHook在接受請求前進行處理請求
3. 調用AccessValidator#validate,驗證acl信息,如果擁有該執行許可權則通過,否則報AclException

AccessValidator 是訪問驗證器介面,PlainAccessValidator是該介面的具體實現。
AccessResource parse(RemotingCommand request, String remoteAddr);
從遠端請求中解析本次請求對應的訪問資源
void validate(AccessResource accessResource);
根據本次需要訪問的許可權,與請求用戶擁有的許可權進行對比驗證,判斷是否擁有,如果沒有則ACLException

當遠端請求過來後,觸發鉤子函數RPCHook,調用 PlainAccessValidator#parse ,並根據 client 端創建 PlainAccessResource實例對象

1、如果當前的請求命令屬於必須是Admin用戶才能訪問的許可權,並且當前用戶並不是管理員角色,則拋出異常
2、遍歷需要許可權與擁有的許可權進行對比,如果配置對應的許可權,則判斷是否匹配;如果未配置許可權,則判斷默認許可權時是否允許

㈨ MQ之RocketMQ常見錯誤

@ TOC

send heart beat to broker error {"fields": {"underlayError":{"Op":"dial","Net":"tcp","Source":null,"Addr":{"IP":"XXX","Port":10911,"Zone":""},"Err":{}}}}

埠號為10911或者9876,這兩個埠號都需要放開的,

所有的工具埠盡量重置公共埠號,避免網路頻繁攻擊

10911 是broker埠號

9876 是Name Server 注冊中心埠號

首先這種問題只有兩種問題, IP+埠問題

很多的博客並不清楚到底什麼原因,很多都是埠號的問題並不需要配置brokerIP1,雲服務內容IP是可以相互訪問的

IP問題: 本地和伺服器地址檢查排除

埠問題: 本地關閉防禦,雲伺服器設置安全組放開埠號,啟動順序要正確先重啟namesrv後重啟broker

修改完配置文件,啟動命令主動讀取配置文件的命令broker.conf,broker不會自動讀取更改過的配置文件

配置文件啟動命令

配置文件

[圖片上傳失敗...(image-fae76d-1639825328918)]

㈩ RocketMQ第五講

broker是RocketMQ的核心,核心工作就是接收生成這的消息,進行存儲。同時,收到消費者的請求後,從磁碟讀取內容,把結果返回給消費者。

消息主體以及元數據的存儲主體,存儲Procer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件;

CommitLog文件中保存了消息的全量內容。不同的Topic的消息,在CommitLog都是順序存放的。就是來一個消息,不管Topic是什麼,直接追加的CommitLog中。

broker啟動了一個專門的線程來構建索引,把CommitLog中的消息,構建了兩種類型的索引。ConsumerQueue和Index。正常消費的時候,是根據Topic來消費,會用到ConsumerQueue索引。

也可根據返回的offsetMsgId,解析出ip,埠和CommitLog中的物理消息偏移量,直接去CommitLog中取數據。

引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。

其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件採取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog物理偏移量、4位元組的消息長度、8位元組tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M。

IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是: {fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現為hash索引。

按照Message Key查詢消息的時候,會用到這個索引文件。

IndexFile索引文件為用戶提供通過「按照Message Key查詢消息」的消息索引查詢服務,IndexFile文件的存儲位置是: {fileName},文件名fileName是以創建時的時間戳命名的,文件大小是固定的,等於40+500W 4+2000W 20= 420000040個位元組大小。如果消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + 「#」 + UNIQ_KEY的value作為 key 來做寫入操作。如果消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + 「#」 + KEY 來做索引。

其中的索引數據包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個欄位,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個欄位將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,並不是一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用於保存一些總的統計信息,4 500W的 Slot Table並不保存真正的索引數據,而是保存每個槽位對應的單向鏈表的頭。20 2000W 是真正的索引數據,即一個 Index File 可以保存 2000W個索引。

「按照Message Key查詢消息」的方式,RocketMQ的具體做法是,主要通過Broker端的QueryMessageProcessor業務處理器來查詢,讀取消息的過程就是用topic和key找到IndexFile索引文件中的一條記錄,根據其中的commitLog offset從CommitLog文件中讀取消息的實體內容。

RocketMQ中有兩個核心模塊,remoting模塊和store模塊。remoting模塊在NameServer,Proce,Consumer和Broker都用到。store只在Broker中用到,包含了存儲文件操作的API,對消息實體的操作是通過DefaultMessageStore進行操作。

屬性和方法很多,就不往這里放了。

文件存儲實現類,包括多個內部類

· 對於文件夾下的一個文件

上面介紹了broker的核心業務流程和架構,關鍵介面和類,啟動流程。最後介紹一下broker的線程模型,只有知道了線程模型,才能大概知道前面介紹的那些事如何協同工作的,對broker才能有一個立體的認識。

RocketMQ的RPC通信採用Netty組件作為底層通信庫,同樣也遵循了Reactor多線程模型,同時又在這之上做了一些擴展和優化。關於Reactor線程模型,可以看看我之前寫的這篇文檔: Reactor線程模型

上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即為上面的1)負責監聽 TCP網路連接請求,建立好連接,創建SocketChannel,並注冊到selector上。RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也可以通過參數配置),然後監聽真正的網路數據。拿到網路數據後,再丟給Worker線程池(eventLoopGroupSelector,即為上面的「N」,源碼中默認設置為3),在真正執行業務邏輯之前需要進行SSL驗證、編解碼、空閑檢查、網路連接管理,這些工作交給defaultEventExecutorGroup(即為上面的「M1」,源碼中默認設置為8)去做。而處理業務操作放在業務線程池中執行,根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變數中找到對應的 processor,然後封裝成task任務後,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息為例,即為上面的 「M2」)。

上面的圖和這段畫是從官方文檔抄過來的,但是文字和圖對應的不是很好,畫的也不夠詳細,但是主要流程是這個樣子。以後有時間了,我重新安裝自己的理解,畫一張更詳細的圖。

AsyncAppender-Worker-Thread-0:非同步列印日誌,logback使用,應該是守護線程

FileWatchService:

NettyEventExecutor:

NettyNIOBoss_:一個

NettyServerNIOSelector_:默認為三個

NSScheledThread:定時任務線程

ServerHouseKeepingService:守護線程

ThreadDeathWatch-2-1:守護線程,Netty用,已經廢棄

RemotingExecutorThread(1-8):工作線程池,沒有共用NettyServerNIOSelector_,直接初始化8個線程

AsyncAppender-Worker-Thread-0:非同步列印日誌,logback使用,共九個:

RocketmqBrokerAppender_inner

RocketmqFilterAppender_inner

RocketmqProtectionAppender_inner

RocketmqRemotingAppender_inner

RocketmqRebalanceLockAppender_inner

RocketmqStoreAppender_inner

RocketmqStoreErrorAppender_inner

RocketmqWaterMarkAppender_inner

RocketmqTransactionAppender_inner

SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

AdminBrokerThread_:remotingServer.registerDefaultProcessor

ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

==================================================================

BrokerControllerScheledThread:=>

BrokerController.this.getBrokerStats().record();

BrokerController.this.consumerOffsetManager.persist();

BrokerController.this.consumerFilterManager.persist();

BrokerController.this.protectBroker();

BrokerController.this.printWaterMark();

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

BrokerController.this.printMasterAndSlaveDiff();

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

:=>

:=>

FilterServerManager.this.createFilterServer();

:=>

ClientHousekeepingService.this.scanExceptionChannel();

PullRequestHoldService

FileWatchService

AllocateMappedFileService

AcceptSocketService

BrokerStatsThread1

熱點內容
6s和安卓8哪個值得入手 發布:2025-07-23 23:03:31 瀏覽:767
巧妙運演算法 發布:2025-07-23 23:02:02 瀏覽:141
sql解析json 發布:2025-07-23 22:48:16 瀏覽:906
戰神解壓密碼 發布:2025-07-23 22:29:07 瀏覽:225
如何刷機安卓系統手機 發布:2025-07-23 22:28:56 瀏覽:740
麥咭編程下載 發布:2025-07-23 22:20:04 瀏覽:37
javadraw 發布:2025-07-23 22:19:59 瀏覽:629
忘記密碼去哪裡找回 發布:2025-07-23 22:19:06 瀏覽:748
php培訓技術 發布:2025-07-23 22:18:21 瀏覽:608
兒童速演算法 發布:2025-07-23 22:09:37 瀏覽:637