当前位置:首页 > 公众号精选 > 架构师社区
[导读]作者:vivo互联网服务器团队-YeWenhao一、RocketMQ架构简介1.1逻辑部署图(图片来自网络)1.2核心组件说明通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单...

ckquote class="js_blockquote_wrap" data-type="2" data-url="" data-author-name="" data-content-utf8-length="25" data-source-title="">作者:vivo互联网服务器团队-Ye Wenhao

一、RocketMQ架构简介


1.1 逻辑部署图


深入剖析RocketMQ源码-NameServer

图片来自网络


1.2 核心组件说明


通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单说明下这四个核心组件:


NameServerNameServer充当路由信息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表。多个Namesrver实例组成集群,但相互独立,没有信息交换。


Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。


Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。


Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。


除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:


Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue,具体结构可以参考下图:


深入剖析RocketMQ源码-NameServer


1.3 设计理念


RocketMQ是基于主题的发布与订阅模式,核心功能包括消息发送、消息存储、消息消费,整体设计追求简单与性能第一,归纳来说主要是下面三种:


  • NameServer取代ZK充当注册中心,NameServer集群间互不通信,容忍路由信息在集群内分钟级不一致,更加轻量级;

  • 使用内存映射机制实现高效的IO存储,达到高吞吐量;

  • 容忍设计缺陷,通过ACK确保消息至少消费一次,但是如果ACK丢失,可能消息重复消费,这种情况设计上允许,交给使用者自己保证。


本文重点介绍的就是NameServer,我们下面一起来看下NameServer是如何启动以及如何进行路由管理的。


二、NameServer架构设计


在第一章已经简单介绍了NameServer取代zk作为一种更轻量级的注册中心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?我们先看下图:


深入剖析RocketMQ源码-NameServer


上面的图描述了NameServer进行路由注册、路由剔除和路由发现的核心原理。


路由注册:Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。


路由剔除NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。


路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。


高可用NameServer通过部署多台NameServer服务器来保证自身的高可用,同时多个NameServer服务器之间不进行通信,这样路由信息发生变化时,各个NameServer服务器之间数据可能不是完全相同的,但是通过发送端的容错机制保证消息发送的高可用。这个也正是NameServer追求简单高效的目的所在。


三、 启动流程


在整理了解了NameServer的架构设计之后,我们先来看下NameServer到底是如何启动的呢?


既然是源码解读,那么我们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),实际调用的是main0()方法,代码如下:

public static NamesrvController main0(String[] args) {
try { //创建namesrvController NamesrvController controller = createNamesrvController(args); //初始化并启动NamesrvController start(controller); String tip = "The Name Server boot success. serializeType=" RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); }
return null;}

通过main方法启动NameServer,主要分为两大步,先创建NamesrvController,然后再初始化并启动NamesrvController。我们分别展开来分析。


3.1 时序图


具体展开阅读代码之前,我们先通过一个序列图对整体流程有个了解,如下图:


深入剖析RocketMQ源码-NameServer


3.2 创建NamesrvController


先来看核心代码,如下:

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // 设置版本号为当前版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); //构造org.apache.commons.cli.Options,并添加-h -n参数,-h参数是打印帮助信息,-n参数是指定namesrvAddr Options options = ServerUtil.buildCommandlineOptions(new Options()); //初始化commandLine,并在options中添加-c -p参数,-c指定nameserver的配置文件路径,-p标识打印配置信息 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //nameserver配置类,业务参数 final NamesrvConfig namesrvConfig = new NamesrvConfig(); //netty服务器配置类,网络参数 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //设置nameserver的端口号 nettyServerConfig.setListenPort(9876); //命令带有-c参数,说明指定配置文件,需要根据配置文件路径读取配置文件内容,并将文件中配置信息赋值给NamesrvConfig和NettyServerConfig if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); //反射的方式 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); //设置配置文件路径 namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //命令行带有-p,说明是打印参数的命令,那么就打印出NamesrvConfig和NettyServerConfig的属性。在启动NameServer时可以先使用./mqnameserver -c configFile -p打印当前加载的配置属性 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); //打印参数命令不需要启动nameserver服务,只需要打印参数即可 System.exit(0); } //解析命令行参数,并加载到namesrvConfig中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //检查ROCKETMQ_HOME,不能为空 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //初始化logback日志工厂,rocketmq默认使用logback作为日志输出 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //创建NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//将全局Properties的内容复制到NamesrvController.Configuration.allConfigs中 // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties);
return controller;}

通过上面对每一行代码的注释,可以看出来,创建NamesrvController的过程主要分为两步:

Step1:通过命令行中获取配置。赋值给NamesrvConfig和NettyServerConfig类。

Step2:根据配置类NamesrvConfig和NettyServerConfig构造一个NamesrvController实例。


可见NamesrvConfig和NettyServerConfig是想当重要的,这两个类分别是NameServer的业务参数和网络参数,我们分别看下这两个类里面有哪些属性:


NamesrvConfig

深入剖析RocketMQ源码-NameServer


NettyServerConfig

深入剖析RocketMQ源码-NameServer

注:Apache Commons CLI是开源的命令行解析工具,它可以帮助开发者快速构建启动命令,并且帮助你组织命令的参数、以及输出列表等。

3.3 初始化并启动


创建了NamesrvController实例之后,开始初始化并启动NameServer。


首先进行初始化,代码入口是NamesrvController#initialize。

public boolean initialize() { //加载kvConfigPath下kvConfig.json配置文件里的KV配置,然后将这些配置放到KVConfigManager#configTable属性中 this.kvConfigManager.load(); //根据nettyServerConfig初始化一个netty服务器。 //brokerHousekeepingService是在NamesrvController实例化时构造函数里实例化的,该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //初始化负责处理Netty网络交互数据的线程池,默认线程数是8个 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注册Netty服务端业务处理逻辑,如果开启了clusterTest,那么注册的请求处理类是ClusterTestRequestProcessor,否则请求处理类是DefaultRequestProcessor this.registerProcessor(); //注册心跳机制线程池,延迟5秒启动,每隔10秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //注册打印KV配置线程池,延迟1分钟启动、每10分钟打印出kvConfig配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //rocketmq可以通过开启TLS来提高数据传输的安全性,如果开启了,那么需要注册一个监听器来重新加载SslContext if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged
本站声明: 本文章由作者或相关机构授权发布,目的在于传递更多信息,并不代表本站赞同其观点,本站亦不保证或承诺内容真实性等。需要转载请联系该专栏作者,如若文章内容侵犯您的权益,请及时联系本站删除。
换一批
延伸阅读

9月2日消息,不造车的华为或将催生出更大的独角兽公司,随着阿维塔和赛力斯的入局,华为引望愈发显得引人瞩目。

关键字: 阿维塔 塞力斯 华为

加利福尼亚州圣克拉拉县2024年8月30日 /美通社/ -- 数字化转型技术解决方案公司Trianz今天宣布,该公司与Amazon Web Services (AWS)签订了...

关键字: AWS AN BSP 数字化

伦敦2024年8月29日 /美通社/ -- 英国汽车技术公司SODA.Auto推出其旗舰产品SODA V,这是全球首款涵盖汽车工程师从创意到认证的所有需求的工具,可用于创建软件定义汽车。 SODA V工具的开发耗时1.5...

关键字: 汽车 人工智能 智能驱动 BSP

北京2024年8月28日 /美通社/ -- 越来越多用户希望企业业务能7×24不间断运行,同时企业却面临越来越多业务中断的风险,如企业系统复杂性的增加,频繁的功能更新和发布等。如何确保业务连续性,提升韧性,成...

关键字: 亚马逊 解密 控制平面 BSP

8月30日消息,据媒体报道,腾讯和网易近期正在缩减他们对日本游戏市场的投资。

关键字: 腾讯 编码器 CPU

8月28日消息,今天上午,2024中国国际大数据产业博览会开幕式在贵阳举行,华为董事、质量流程IT总裁陶景文发表了演讲。

关键字: 华为 12nm EDA 半导体

8月28日消息,在2024中国国际大数据产业博览会上,华为常务董事、华为云CEO张平安发表演讲称,数字世界的话语权最终是由生态的繁荣决定的。

关键字: 华为 12nm 手机 卫星通信

要点: 有效应对环境变化,经营业绩稳中有升 落实提质增效举措,毛利润率延续升势 战略布局成效显著,战新业务引领增长 以科技创新为引领,提升企业核心竞争力 坚持高质量发展策略,塑强核心竞争优势...

关键字: 通信 BSP 电信运营商 数字经济

北京2024年8月27日 /美通社/ -- 8月21日,由中央广播电视总台与中国电影电视技术学会联合牵头组建的NVI技术创新联盟在BIRTV2024超高清全产业链发展研讨会上宣布正式成立。 活动现场 NVI技术创新联...

关键字: VI 传输协议 音频 BSP

北京2024年8月27日 /美通社/ -- 在8月23日举办的2024年长三角生态绿色一体化发展示范区联合招商会上,软通动力信息技术(集团)股份有限公司(以下简称"软通动力")与长三角投资(上海)有限...

关键字: BSP 信息技术
关闭
关闭