当前位置:首页 » 存储配置 » rocketmq怎么配置端口

rocketmq怎么配置端口

发布时间: 2022-11-21 19:11:04

㈠ rocketmq 10911端口的ip怎么修改

在你启动的broker时,使用的配置文件中加listenPort=10911,将10911修改为你想要的端口号就好了。

㈡ rocketmq的9876端口可以改吗

㈢ RocketMQ ACL使用指南

ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?

另外,RocketMQ还支持按照客户端IP进行白名单设置。

在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:

对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。

acl默认的配置文件名:plain_acl.yml,需要放在${ROCKETMQ_HOME}/store/config目录下。下面对其配置项一一介绍。

全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:

配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。

登录用户名,长度必须大于6个字符。

登录密码。长度必须大于6个字符。

用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。

boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。

默认topic权限。该值默认为DENY(拒绝)。

默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。

设置topic的权限。其类型为数组,其可选择值在下节介绍。

设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。

上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。

首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目录。

broker.conf的配置文件如下:

plain_acl.yml文件内容如下:

从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。

运行效果如图所示:

发现并不没有消费消息,符合预期。

关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。

推荐阅读:
1、 RocketMQ实战:生产环境中,autoCreateTopicEnable为什么不能设置为true

2、 RocketMQ 消息发送system busy、broker busy原因分析与解决方案

3、 RocketMQ HA机制(主从同步)

4、 RocketMQ事务消息实战

㈣ 安装部署RocketMQ集群(双主双从)

在 前面 ,我们介绍了如果快速安装单个RocketMQ。快速安装意味着这只是在测试环境下的小打小闹,我们在单机安装的基础上,尝试安装RocketMQ集群。本次安装为了方便,使用的是已经编译好的二进制包进行安装部署。

RocketMQ集群解决了单机版RocketMQ所存在的单点故障问题,并且还可以对RocketMQ性能进行横向的拓展。
下图是官网上的架构图,可以看到RocketMQ分为四个部分:

其中,根据RocketMQ Broker的集群方式的不同,大概可以分为三种:

在配置文件所在目录 conf 中,我们可以看到有三个文件夹:2m-noslave、2m-2s-async、2m-2s-sync。这三个目录刚好对应上面提到的三种集群方式,里面包含了官方给的配置示例,我们待会会在这个基础上修改。

下面我们将要部署双master双slave同步复制的RocketMQ集群,这里需要准备两个虚拟机。

就这样,rockermq就安装好了,接下来我们要修改配置文件。

由于默认的数据和日志存储的位置是当前用户的家目录,我们还需要修改到 /data/rocketmq目录下:

日志目录的配置文件在 conf的几个xml文件里面:

最后,我们进入到 2m-2s-sync 目录下,修改里面的broker配置文件:

先启动两台机器的Nameserver

然后分别启动4个Broker进程:

就这样,RocketMQ双主双从的集群就已经搭建好了,通过rocketmq-console的监控页面,可以看到如下的集群情况:

这些配置参数,在Broker 启动的时候生效,如果启动后有更改,要重启Broker 。现在使用云服务或多网卡的机器比较普遍, Broker 自动探测获得的ip地址可能不符合要求,通过brokerIP1 =47 .98.41.234 这样的配置参数,可以设置Broker 机器对外暴露的ip 地址。

㈤ RocketMQ(三)——系统架构

RocketMQ架构上主要分为四部分构成:

消息生产者,负责生产消息。Procer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟

RocketMQ中的消息生产者都是以生产者组(Procer Group)的形式出现的。生产者组是同一类生产者的集合,这类Procer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。

消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务处理

RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是统一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息。消费者组使得在消息消费方法,实现负载均衡(讲一个Topic中不同的Queue平均分配给同一个Consumer Group的不同Consumer,并不是负载均衡)和容错(一个Consumer挂了,该Consumer Group中的其他Consumer可以接着消费元Consumer消费的Queue)的目标变得非常容易

消费者组中Consumer的数量应小于等于Topic的Queue数量。如果超出Queue数量,则多出的Consumer将不能消费消息。

不过一个Topic类型的消息可以被多个消费者组同时消费。

NameServer是一个Broker与Topic路由的注册中心,支持Broker的动态注册与发现。
RocketMQ的思想来自于Kafuka,而Kafka是以来了Zookeeper的。所以,在RocketMQ的早期版本也依赖Zookeeper。从3.0开始去掉了Zookeeper的依赖,使用了自己的NameServer。

NameServer通常也是以集群的方式部署,不过,NameServer是无状态的,即NameServer集群中的各个节点之间是无差异的,各个节点相互不进行信息通讯。那各个节点中的数据是如何进行数据同步的呢?在Broker节点启动时,轮询NameServer列表,与每个NameServer节点建立长连接,发起注册请求。在NameServer内部维护者一个Broker列表,用来动态存储Broker信息

Broker节点为了证明自己是活着的,为了维护与NameServer间的长连接,会将最新的信息以心跳包的方式上报给NameServer,每30秒发送一次心跳。心跳包中包含BrokerId、Broker地址(IP+Port)、Broker名称、Broker所属集群名称等等。NameServer在接收到心跳包后,会更新心跳时间戳,记录这个Broker的最新存活时间。

由于Broker关机、宕机或网络抖动等原因,NameServer没有收到Broker的心跳,NameServer可能会将其从Broker列表中剔除
NameServer中有一个定时任务,每隔10秒就会扫描一次Broker表,查看每一个Broker的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定Broekr失效,然后将其从Broker列表中剔除。

RocketMQ的路由发现采用的是Pull模型。当Topic路由信息出现变化时,NameServer不会主动推送给客户端,而是客户端定时拉取最新的路由。默认每30秒拉取一次最新的路由

客户端再配置时必须要写上NameServer集群的地址,那么客户端道理连接在哪个NameServer节点呢?客户端首先会生产一个随机数,然后再与NameServer节点数取模,此时得到的就是要连接的节点索引,然后就会进行连接。如果连接失败,则会采用round-robin策略,逐个尝试去连接其他节点。
首先采用的是 随机策略 进行选择,失败后采用的是轮询策略。

Broker充当着消息中转角色,负责存储消息、转发消息。Broker在RocketMQ系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker同时也存储着消息相关的元数据,包括消费者组、消费进度偏移offset、主题、队列等

Remoting Mole :整个Broker的实体,负责处理来自clients端的请求。而这个Broker实体则由以下模块构成。
Client Manager :客户端管理器。负责接收、解析客户端(Procer/Consumer)请求,管理客户端。
Store Service :存储服务。提供方便简单的API接口,处理消息存储到物理硬盘和消息查询功能。
HA Service :高可用服务,提供Master Broker和Slave Broker之间的数据同步功能。
Index Service :索引服务。根据特定的Message Key,对投递到Broker的消息进行索引服务,同时也提供根据Message Key对消息进行快速查询的功能

为了增强Broker性能与吞吐量,Broker一般都是以集群形式出现的。各集群节点中可能存放着相同Topic的不同Queue。
如果某Broker节点宕机,如何保证数据不丢失呢?
其解决方案是,将每个Broekr集群节点进行横向扩展,即将Broker节点再建为一个HA集群,解决单点问题。
Broker节点集群是一个主从集群,即有Master和Slave两种角色。Master负责处理读写操作请求,Slave负责对Master中的数据进行备份。当Master挂掉了,Slaver会自动切换为Master去工作。所以这个Broker集群式主备集群。Master与Slave的对应关系是通过指定相同的BrokerName、不同的BrokerId来确定的。BrokerId为0表示Master,非0表示Slave。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。

①启动NameServer,NameServer启动后开始监听端口,等待Broker、Procer、Consumer连接
②启动Broker时,Broker会与所有的NameServer保持长连接,每30秒向NameServer定时发送心跳包
③发送消息前,可以先创建Topic ,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入到NameServer中。也可以在发送消息时自动创建Topic。
④Procer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取路由信息,即当前发送Topic的Queue与Broker地址的映射关系。然后根据算法策略从队选择一个Queue,与队列所在的Broker建立长连接从而向Broker发送消息。
⑤Consumer与Procer类似,跟其中一台NameServer建立长连接,获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其要消费的Queue,然后与Broker建立长连接,消费其中的消息。Consumer会向Broker发送心跳,以确保Broker的存活状态

手动创建Topic时,有两种模式:

自动创建Topic时,默认采用的是Broker模式,会为每个Broker默认创建四个Queue

从物理上讲,读/写队列是同一个队列。所以,不存在读/写队列数据同步问题。读/写队列是逻辑上进行区分的概念 。一般来说,读/写队列数量是相同的。

读/写队列数量不同是有问题的。
但这样可以方便缩容
perm用于设置对当前创建Topic的操作权限:2表示只写,4表示只读,6表示读写

㈥ rocketmq总结以及自动化部署策略

是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 topic 对应的队列集合。

RocketMQ 网络部署特点

1)高并发读写服务

Broker的高并发读写主要是依靠以下两点:

2) 负载均衡与动态伸缩

负载均衡 :Broker上存Topic信息,Topic由多个队列组成,队列会平均分散在多个Broker上,而Procer的发送机制保证消息尽量平均分布到所有队列中,最终效果就是所有消息都平均落在每个Broker上。

动态伸缩能力(非顺序消息) :Broker的伸缩性体现在两个维度:Topic, Broker。

3) 高可用&高可靠

高可用:集群部署时一般都为主备,备机实时从主机同步消息,如果其中一个主机宕机,备机提供消费服务,但不提供写服务。

高可靠:所有发往broker的消息,有同步刷盘和异步刷盘机制;同步刷盘时,消息写入物理文件才会返回成功,异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,但是机器宕机崩溃是很少发生的,除非突然断电

4)Broker与Namesrv的心跳机制
单个Broker跟所有Namesrv保持心跳请求,心跳间隔为30秒,心跳请求中包括当前Broker所有的Topic信息。Namesrv会反查Broer的心跳信息,如果某个Broker在2分钟之内都没有心跳,则认为该Broker下线,调整Topic跟Broker的对应关系。但此时Namesrv不会主动通知Procer、Consumer有Broker宕机。

消费者启动时需要指定Namesrv地址,与其中一个Namesrv建立长连接。消费者每隔30秒从nameserver获取所有topic的最新队列情况,这意味着某个broker如果宕机,客户端最多要30秒才能感知。连接建立后,从namesrv中获取当前消费Topic所涉及的Broker,直连Broker。

Consumer跟Broker是长连接,会每隔30秒发心跳信息到Broker。Broker端每10秒检查一次当前存活的Consumer,若发现某个Consumer 2分钟内没有心跳,就断开与该Consumer的连接,并且向该消费组的其他实例发送通知,触发该消费者集群的负载均衡。

消费者端的负载均衡
先讨论消费者的消费模式,消费者有两种模式消费:集群消费,广播消费。

消费者端的负载均衡,就是集群消费模式下,同一个ID的所有消费者实例平均消费该Topic的所有队列。

Procer启动时,也需要指定Namesrv的地址,从Namesrv集群中选一台建立长连接。如果该Namesrv宕机,会自动连其他Namesrv。直到有可用的Namesrv为止。

生产者每30秒从Namesrv获取Topic跟Broker的映射关系,更新到本地内存中。再跟Topic涉及的所有Broker建立长连接,每隔30秒发一次心跳。在Broker端也会每10秒扫描一次当前注册的Procer,如果发现某个Procer超过2分钟都没有发心跳,则断开连接。

生产者端的负载均衡

生产者发送时,会自动轮询当前所有可发送的broker,一条消息发送成功,下次换另外一个broker发送,以达到消息平均落到所有的broker上。

这里需要注意一点:假如某个Broker宕机,意味生产者最长需要30秒才能感知到。在这期间会向宕机的Broker发送消息。当一条消息发送到某个Broker失败后,会往该broker自动再重发2次,假如还是发送失败,则抛出发送失败异常。业务捕获异常,重新发送即可。客户端里会自动轮询另外一个Broker重新发送,这个对于用户是透明的。

绑定hosts或dns:

主机命名说明:

在实际应用中都会涉及多环境的问题,比如有线下环境(dev)和生产环境(prod),不同环境的应用最好保持配置一致,减少各个每个环境的配置工作量。

Rocketmq各环境统一连接地址

NAMESRV_ADDR="nameserver1.rocketmq.test.com:9876;nameserver2.rocketmq.test.com:9876"

根据Rocketmq集群说明,其实最终只需暴露nameserver的地址给应用即可,因此,各个环境绑定各个环境对应的hosts/dns即可使用统一连接的地址。

rocketmq各个组件都支持横向扩容:

通过web可以查看集群状态,查看topic信息以及创建更改topic,管理procer和consumer等。

用户手册: https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md

㈦ Rocketmq的k8s配置(1nameservice + 1brocker)

RockerMQ在k8s的部署有两种方式, 一种是使用operator 在k8s集群中部署,可参考 operation项目 ; 一种是编写简单的k8s配置文件,在rocketmq的docker项目中有提供模板。
这里我们希望使用单机版k8s部署一套低配置rockerMQ, 仅启动一个nameservice和1个broker,我们将使用 rocketmq-docker项目 提供的模板来完成。

apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq
spec:
replicas: 1
selector:
matchLabels:
app: rocketmq
template:
metadata:
labels:
app: rocketmq
spec:
containers:
- name: broker
image: apacherocketmq/rocketmq:4.6.0
command: ["sh","mqbroker", "-n","localhost:9876"]
imagePullPolicy: IfNotPresent
ports:
- containerPort: 10909
- containerPort: 10911
env:
- name: JAVA_OPT
value: -server -XX:ParallelGCThreads=1
volumeMounts:
- mountPath: /home/rocketmq/logs
name: brokeroptlogs
- mountPath: /home/rocketmq/store
name: brokeroptstore
- name: namesrv
image: apacherocketmq/rocketmq:4.6.0
command: ["sh","mqnamesrv"]
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9876
volumeMounts:
- mountPath: /home/rocketmq/logs
name: namesrvoptlogs
volumes:
- name: brokeroptlogs
emptyDir: {}
- name: brokeroptstore
emptyDir: {}
- name: namesrvoptlogs
emptyDir: {}
- name: namesrvoptstore
emptyDir: {}

apiVersion: v1
kind: Service
metadata:
name: rocketmqservice
spec:
type: NodePort
ports:
- name: namesrv
port: 9876
targetPort: 9876
nodePort: 32000
selector:
app: rocketmq

notes: 签名异常问题
Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available
手动方案I,在tool.sh 中${JAVA_HOME}/jre/lib/ext后加上ext文件夹的绝对路径(jdk路径)

最终方案: 手动的方式,很不方便,经过检查,实际问题是由于路径上的${JAVA_HOME}变量为空,导致无法找到etx路径。所以,我们通过k8s的方式传入JAVA_HOME环境便令就可以了。如下图:

3.2 添加订阅组
订阅组 可以用来实现消费的loadbalance,同一订阅组的消费者分享所有的读队列。
创建订阅组使用updateSubGroup 命令,所需参数如下:

执行命令新建一个授权服务的消费组
./mqadmin updateSubGroup -b localhost:10911 -n localhost:9876 -g GID_authorize
执行结果:

㈧ RocketMQ系列:ACL机制

     ACL全称access control list,俗称访问控制列表。主要包括如下角色

     首先Broker.conf文件配置 aclEnable=true ,然后需要将 plain_acl.yml 放在 ${ROCKETMQ_HOME}/conf目录, plain_acl.yml

      服务端当配置好plain_acl.yml后并在 broker.conf中开启 aclEnable=true ,服务端则会进行下面逻辑验证

     在构造函数添加 RPCHook ,进行创建ACL对象实例。

     发送消息前置执行钩子函数并验证ACL权限,若抛异常后则无法发送消息。

Broker端初始化ALC配置, 加载 AccessValidator配置
1. 核心是基于SPI机制,读取META-INF/service/org.apache.rocketmq.acl.AccessValidator 访问验证器
2. 遍历访问验证器,向Broker注册钩子函数。RPCHook在接受请求前进行处理请求
3. 调用AccessValidator#validate,验证acl信息,如果拥有该执行权限则通过,否则报AclException

AccessValidator 是访问验证器接口,PlainAccessValidator是该接口的具体实现。
AccessResource parse(RemotingCommand request, String remoteAddr);
从远端请求中解析本次请求对应的访问资源
void validate(AccessResource accessResource);
根据本次需要访问的权限,与请求用户拥有的权限进行对比验证,判断是否拥有,如果没有则ACLException

当远端请求过来后,触发钩子函数RPCHook,调用 PlainAccessValidator#parse ,并根据 client 端创建 PlainAccessResource实例对象

1、如果当前的请求命令属于必须是Admin用户才能访问的权限,并且当前用户并不是管理员角色,则抛出异常
2、遍历需要权限与拥有的权限进行对比,如果配置对应的权限,则判断是否匹配;如果未配置权限,则判断默认权限时是否允许

㈨ MQ之RocketMQ常见错误

@ TOC

send heart beat to broker error {"fields": {"underlayError":{"Op":"dial","Net":"tcp","Source":null,"Addr":{"IP":"XXX","Port":10911,"Zone":""},"Err":{}}}}

端口号为10911或者9876,这两个端口号都需要放开的,

所有的工具端口尽量重置公共端口号,避免网络频繁攻击

10911 是broker端口号

9876 是Name Server 注册中心端口号

首先这种问题只有两种问题, IP+端口问题

很多的博客并不清楚到底什么原因,很多都是端口号的问题并不需要配置brokerIP1,云服务内容IP是可以相互访问的

IP问题: 本地和服务器地址检查排除

端口问题: 本地关闭防御,云服务器设置安全组放开端口号,启动顺序要正确先重启namesrv后重启broker

修改完配置文件,启动命令主动读取配置文件的命令broker.conf,broker不会自动读取更改过的配置文件

配置文件启动命令

配置文件

[图片上传失败...(image-fae76d-1639825328918)]

㈩ RocketMQ第五讲

broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。

消息主体以及元数据的存储主体,存储Procer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。

也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

按照Message Key查询消息的时候,会用到这个索引文件。

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W 4+2000W 20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4 500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20 2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

“按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Proce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。

属性和方法很多,就不往这里放了。

文件存储实现类,包括多个内部类

· 对于文件夹下的一个文件

上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型

上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

FileWatchService:

NettyEventExecutor:

NettyNIOBoss_:一个

NettyServerNIOSelector_:默认为三个

NSScheledThread:定时任务线程

ServerHouseKeepingService:守护线程

ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

RocketmqBrokerAppender_inner

RocketmqFilterAppender_inner

RocketmqProtectionAppender_inner

RocketmqRemotingAppender_inner

RocketmqRebalanceLockAppender_inner

RocketmqStoreAppender_inner

RocketmqStoreErrorAppender_inner

RocketmqWaterMarkAppender_inner

RocketmqTransactionAppender_inner

SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

AdminBrokerThread_:remotingServer.registerDefaultProcessor

ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

==================================================================

BrokerControllerScheledThread:=>

BrokerController.this.getBrokerStats().record();

BrokerController.this.consumerOffsetManager.persist();

BrokerController.this.consumerFilterManager.persist();

BrokerController.this.protectBroker();

BrokerController.this.printWaterMark();

log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

BrokerController.this.printMasterAndSlaveDiff();

BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

:=>

:=>

FilterServerManager.this.createFilterServer();

:=>

ClientHousekeepingService.this.scanExceptionChannel();

PullRequestHoldService

FileWatchService

AllocateMappedFileService

AcceptSocketService

BrokerStatsThread1

热点内容
安溪哪里有卖礼金密码箱 发布:2025-07-23 17:32:36 浏览:528
同等配置蓝鸟同轩逸哪个好 发布:2025-07-23 17:31:27 浏览:545
云服务器图片加载速度慢 发布:2025-07-23 17:08:16 浏览:171
网址导航源码带后台 发布:2025-07-23 17:01:40 浏览:599
石粉过磅算法 发布:2025-07-23 16:53:05 浏览:78
e盘访问被拒绝 发布:2025-07-23 16:51:49 浏览:349
c语言用什么编译器 发布:2025-07-23 16:49:26 浏览:571
浏览器androidflash 发布:2025-07-23 16:36:38 浏览:911
为什么战网不用输入密码 发布:2025-07-23 16:35:47 浏览:938
存储过程步骤 发布:2025-07-23 16:33:26 浏览:359