深入剖析RocketMQ源码-NameServer
扫描二维码
随时随地手机看文章
一、RocketMQ架构简介
1.1 逻辑部署图
(图片来自网络)
1.2 核心组件说明
通过上图可以看到,RocketMQ的核心组件主要包括4个,分别是NameServer、Broker、Producer和Consumer,下面我们先依次简单说明下这四个核心组件:
NameServer:NameServer充当路由信息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表。多个Namesrver实例组成集群,但相互独立,没有信息交换。
Broker:消息中转角色,负责存储消息、转发消息。Broker服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
除了上面说的三个核心组件外,还有Topic这个概念下面也会多次提到:
Topic:表示一类消息的集合,每个Topic包含若干条消息,每条消息只能属于一个Topic,是RocketMQ进行消息订阅的基本单位。一个Topic可以分片在多个Broker集群上,每一个Topic分片包含多个queue,具体结构可以参考下图:
1.3 设计理念
RocketMQ是基于主题的发布与订阅模式,核心功能包括消息发送、消息存储、消息消费,整体设计追求简单与性能第一,归纳来说主要是下面三种:
- NameServer取代ZK充当注册中心,NameServer集群间互不通信,容忍路由信息在集群内分钟级不一致,更加轻量级;
- 使用内存映射机制实现高效的IO存储,达到高吞吐量;
- 容忍设计缺陷,通过ACK确保消息至少消费一次,但是如果ACK丢失,可能消息重复消费,这种情况设计上允许,交给使用者自己保证。
本文重点介绍的就是NameServer,我们下面一起来看下NameServer是如何启动以及如何进行路由管理的。
二、NameServer架构设计
在第一章已经简单介绍了NameServer取代zk作为一种更轻量级的注册中心充当路由信息的提供者。那么具体是如何来实现路由信息管理的呢?我们先看下图:
上面的图描述了NameServer进行路由注册、路由剔除和路由发现的核心原理。
路由注册:Broker服务器在启动的时候会想NameServer集群中所有的NameServer发送心跳信号进行注册,并会每隔30秒向nameserver发送心跳,告诉NameServer自己活着。NameServer接收到Broker发送的心跳包之后,会记录该broker信息,并保存最近一次收到心跳包的时间。
路由剔除:NameServer和每个Broker保持长连接,每隔30秒接收Broker发送的心跳包,同时自身每个10秒扫描BrokerLiveTable,比较上次收到心跳时间和当前时间比较是否大于120秒,如果超过,那么认为Broker不可用,剔除路由表中该Broker相关信息。
路由发现:路由发现不是实时的,路由变化后,NameServer不主动推给客户端,等待producer定期拉取最新路由信息。这样的设计方式降低了NameServer实现的复杂性,当路由发生变化时通过在消息发送端的容错机制来保证消息发送的高可用(这块内容会在后续介绍producer消息发送时介绍,本文不展开讲解)。
高可用:NameServer通过部署多台NameServer服务器来保证自身的高可用,同时多个NameServer服务器之间不进行通信,这样路由信息发生变化时,各个NameServer服务器之间数据可能不是完全相同的,但是通过发送端的容错机制保证消息发送的高可用。这个也正是NameServer追求简单高效的目的所在。
三、 启动流程
在整理了解了NameServer的架构设计之后,我们先来看下NameServer到底是如何启动的呢?
既然是源码解读,那么我们先来看下代码入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),实际调用的是main0()方法,代码如下:
public static NamesrvController main0(String[] args) {
try {
//创建namesrvController
NamesrvController controller = createNamesrvController(args);
//初始化并启动NamesrvController
start(controller);
String tip = "The Name Server boot success. serializeType=" RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
通过main方法启动NameServer,主要分为两大步,先创建NamesrvController,然后再初始化并启动NamesrvController。我们分别展开来分析。
3.1 时序图
具体展开阅读代码之前,我们先通过一个序列图对整体流程有个了解,如下图:
3.2 创建NamesrvController
先来看核心代码,如下:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 设置版本号为当前版本号
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
//构造org.apache.commons.cli.Options,并添加-h -n参数,-h参数是打印帮助信息,-n参数是指定namesrvAddr
Options options = ServerUtil.buildCommandlineOptions(new Options());
//初始化commandLine,并在options中添加-c -p参数,-c指定nameserver的配置文件路径,-p标识打印配置信息
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
//nameserver配置类,业务参数
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//netty服务器配置类,网络参数
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设置nameserver的端口号
nettyServerConfig.setListenPort(9876);
//命令带有-c参数,说明指定配置文件,需要根据配置文件路径读取配置文件内容,并将文件中配置信息赋值给NamesrvConfig和NettyServerConfig
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
//反射的方式
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
//设置配置文件路径
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
//命令行带有-p,说明是打印参数的命令,那么就打印出NamesrvConfig和NettyServerConfig的属性。在启动NameServer时可以先使用./mqnameserver -c configFile -p打印当前加载的配置属性
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
//打印参数命令不需要启动nameserver服务,只需要打印参数即可
System.exit(0);
}
//解析命令行参数,并加载到namesrvConfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
//检查ROCKETMQ_HOME,不能为空
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
//初始化logback日志工厂,rocketmq默认使用logback作为日志输出
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
//创建NamesrvController
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//将全局Properties的内容复制到NamesrvController.Configuration.allConfigs中
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
return controller;
}
通过上面对每一行代码的注释,可以看出来,创建NamesrvController的过程主要分为两步:
Step1:通过命令行中获取配置。赋值给NamesrvConfig和NettyServerConfig类。Step2:根据配置类NamesrvConfig和NettyServerConfig构造一个NamesrvController实例。
可见NamesrvConfig和NettyServerConfig是想当重要的,这两个类分别是NameServer的业务参数和网络参数,我们分别看下这两个类里面有哪些属性:
NamesrvConfig
NettyServerConfig
注:Apache Commons CLI是开源的命令行解析工具,它可以帮助开发者快速构建启动命令,并且帮助你组织命令的参数、以及输出列表等。
3.3 初始化并启动
创建了NamesrvController实例之后,开始初始化并启动NameServer。
首先进行初始化,代码入口是NamesrvController#initialize。
public boolean initialize() {
//加载kvConfigPath下kvConfig.json配置文件里的KV配置,然后将这些配置放到KVConfigManager#configTable属性中
this.kvConfigManager.load();
//根据nettyServerConfig初始化一个netty服务器。
//brokerHousekeepingService是在NamesrvController实例化时构造函数里实例化的,该类负责Broker连接事件的处理,实现了ChannelEventListener,主要用来管理RouteInfoManager的brokerLiveTable
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
//初始化负责处理Netty网络交互数据的线程池,默认线程数是8个
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册Netty服务端业务处理逻辑,如果开启了clusterTest,那么注册的请求处理类是ClusterTestRequestProcessor,否则请求处理类是DefaultRequestProcessor
this.registerProcessor();
//注册心跳机制线程池,延迟5秒启动,每隔10秒遍历RouteInfoManager#brokerLiveTable这个属性,用来扫描不存活的broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//注册打印KV配置线程池,延迟1分钟启动、每10分钟打印出kvConfig配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
//rocketmq可以通过开启TLS来提高数据传输的安全性,如果开启了,那么需要注册一个监听器来重新加载SslContext
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged