入口: org.apache.rocketmq.broker.BrokerStartup#main
broker的启动主要分为两部分:1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是,这里的BrokerController相当于Broker的一个中央控制器类,并不是编写http接口的类
创建brokerController:设置一些属性参数,读取外部配置文件,实例化brokerController并调用其初始化方法。
- 实例化brokerController:主要是会实例化很多的配置类和线程池的阻塞队列。
- 初始化方法 会加载如消费者消费进度等的文件,会实例化消息存储的服务类,并调用其加载方法,加载储存消息的相关文件到内存中,并进行数据恢复。还会创建各类线程池并注册netty服务的消息处理器,之后创建各种定时任务,如:持久化消费者消费进度的任务、定时打印各种日志的任务等。
启动brokerController: 内部会启动消息储存服务、netty服务、长轮询拉取消息挂起服务、向nameserver心跳定时任务等
public static void main(String[] args) {
// broker启动
start(createBrokerController(args));
}
创建brokerController:org.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {
// 设置属性rocketmq.remoting.version,即当前rocketmq版本
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
NettySystemConfig.socketSndbufSize = 131072;
}
if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
NettySystemConfig.socketRcvbufSize = 131072;
}
try {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// 创建Broker的配置类,包含Broker的各种配置,比如 ROCKETMQ_HOME
final BrokerConfig brokerConfig = new BrokerConfig();
// NettyServer的配置类,Broker接收来自客户端的消息的时候作为服务端
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// NettyClient的配置类,Broker连接NameServer的时候还会作为客户端
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
// 设置作为NettyServer时的监听端口为10911
nettyServerConfig.setListenPort(10911);
// Broker的消息存储配置,例如各种文件大小等
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
/*
* 判断命令行启动是否包含 -c 命令,用于指定配置文件
* 如果包含,则解析指定的配置文件
* 启动Broker的时候使用命令参数 -c /xxx/rocketmq/config/conf/broker.conf
*/
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
properties2SystemEnv(properties);
MixAll.properties2Object(properties, brokerConfig);
MixAll.properties2Object(properties, nettyServerConfig);
MixAll.properties2Object(properties, nettyClientConfig);
MixAll.properties2Object(properties, messageStoreConfig);
BrokerPathConfigHelper.setBrokerConfigPath(file);
in.close();
}
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 获取namesrv地址
String namesrvAddr = brokerConfig.getNamesrvAddr();
if (null != namesrvAddr) {
try {
// 可以指定多个地址,以 ; 隔开,这里进行拆分
String[] addrArray = namesrvAddr.split(";");
for (String addr : addrArray) {
RemotingUtil.string2SocketAddress(addr);
}
} catch (Exception e) {
System.out.printf(
"The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
namesrvAddr);
System.exit(-3);
}
}
// 设置、校验brokerId,BrokerId为0表示Master,非0表示Slave
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
brokerConfig.setBrokerId(MixAll.MASTER_ID);
break;
case SLAVE:
if (brokerConfig.getBrokerId() <= 0) {
System.out.printf("Slave's brokerId must be > 0");
System.exit(-3);
}
break;
default:
break;
}
// 是否开启DLeger,即多副本(主从切换集群)
if (messageStoreConfig.isEnableDLegerCommitLog()) {
brokerConfig.setBrokerId(-1);
}
// 设置高可用通信监听端口,为监听端口+1,默认就是10912
// 该端口主要用于 如 主从同步之类的高可用操作
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
// 判断命令行中是否包含字符'p'(printConfigItem)和'm',如果存在则打印配置信息并结束jvm运行,没有的话就不用管
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
MixAll.printObjectProperties(console, nettyClientConfig);
MixAll.printObjectProperties(console, messageStoreConfig);
System.exit(0);
} else if (commandLine.hasOption('m')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
MixAll.printObjectProperties(console, brokerConfig, true);
MixAll.printObjectProperties(console, nettyServerConfig, true);
MixAll.printObjectProperties(console, nettyClientConfig, true);
MixAll.printObjectProperties(console, messageStoreConfig, true);
System.exit(0);
}
// 打印当前broker的配置日志
log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
MixAll.printObjectProperties(log, brokerConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
// 实例化 BrokerController,内部主要初始化了一些配置类、manager类、处理器、线程池等
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
// 将所有的-c的外部配置信息保存到BrokerController中的Configuration对象属性的allConfigs属性中
controller.getConfiguration().registerConfig(properties);
// 初始化BrokerController
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 添加关闭钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
实例化brokerController: org.apache.rocketmq.broker.BrokerController#BrokerController
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig
) {
// broker的配置
this.brokerConfig = brokerConfig;
// 作为netty服务端与客户端交互的配置
this.nettyServerConfig = nettyServerConfig;
// 作为netty客户端与服务端交互的配置
this.nettyClientConfig = nettyClientConfig;
// 消息存储的配置
this.messageStoreConfig = messageStoreConfig;
// 消费者偏移量管理器,维护offset进度信息
this.consumerOffsetManager = new ConsumerOffsetManager(this);
//topic配置管理器,管理broker中存储的所有topic的配置
this.topicConfigManager = new TopicConfigManager(this);
// 处理消费者拉取消息请求的处理器
this.pullMessageProcessor = new PullMessageProcessor(this);
//拉取请求挂起服务,处理无消息时push长轮询消费者的挂起等待机制
this.pullRequestHoldService = new PullRequestHoldService(this);
//消息送达的监听器,生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldService
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
//消费者id变化监听器
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
//消费者管理类,维护消费者组的注册实例信息以及topic的订阅信息,并对消费者id变化进行监听
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
//消费者过滤管理器,配置文件为:xx/config/consumerFilter.json
this.consumerFilterManager = new ConsumerFilterManager(this);
//生产者管理器,包含生产者的注册信息,通过groupName分组
this.producerManager = new ProducerManager();
//客户端连接心跳服务,用于定时扫描生产者和消费者客户端,并将不活跃的客户端通道及相关信息移除
this.clientHousekeepingService = new ClientHousekeepingService(this);
//处理某些broker到客户端的请求,例如检查生产者的事务状态,重置offset
this.broker2Client = new Broker2Client(this);
//订阅分组关系管理器,维护消费者组的一些附加运维信息
this.subscriptionGroupManager = new SubscriptionGroupManager(this);
//broker对方访问的API,处理broker对外的发起请求,比如向nameServer注册,向master、slave发起的请求
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
//过滤服务管理器,拉取消息过滤
this.filterServerManager = new FilterServerManager(this);
//用于从节点,定时向主节点发起请求同步数据,例如topic配置、消费位移等
this.slaveSynchronize = new SlaveSynchronize(this);
/*初始化各种阻塞队列。将会被设置到对应的处理不同客户端请求的线程池执行器中*/
//处理来自生产者的发送消息的请求的队列
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
//处理reply消息的请求的队列,RocketMQ4.7.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。
//即生产者发送了消息之后,可以同步或者异步的收到消费了这条消息的消费者的响应
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
//处理查询请求的队列
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
//客户端管理器的队列
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
//消费者管理器的队列,目前没用到
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
//心跳处理的队列
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
//事务消息相关处理的队列
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
//broker状态管理器,保存Broker运行时状态
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
//目前没用到
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
//broker快速失败服务
this.brokerFastFailure = new BrokerFastFailure(this);
//配置类
this.configuration = new Configuration(
log,
BrokerPathConfigHelper.getBrokerConfigPath(),
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
初始化brokerController: org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {
//topic配置文件加载,路径为 {user.home}/store/config/topics.json
boolean result = this.topicConfigManager.load();
// 消费者消费偏移量配置文件加载,路径为 {user.home}/store/config/consumerOffset.json
result = result && this.consumerOffsetManager.load();
// 订阅分组配置文件加载,路径为 {user.home}/store/config/subscriptionGroup.json
result = result && this.subscriptionGroupManager.load();
// 消费者过滤配置文件加载,路径为 {user.home}/store/config/consumerFilter.json
result = result && this.consumerFilterManager.load();
if (result) {
// 实例化和初始化消息存储服务相关类 DefaultMessageStore
try {
// 实例化消息存储类DefaultMessageStore
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
/**
* enableDLegerCommitLog 为 true(默认为false),则创建DLedgerRoleChangeHandler。
* 在启用enableDLegerCommitLog情况下,broker通过raft协议选主,可以实现主从角色自动切换
*/
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
/*
* 通过消息存储服务 加载 储存消息的相关文件到内存中,并进行数据恢复(此步骤是broker启动的核心步骤)
* 文件:commitLog文件(储存消息的)、consumequeue文件(消费者依据此文件消费,储存指向commitLog中消息的偏移量)、indexFile文件
* 数据恢复:broker可能会异常关闭,导致消息在文件中储存不完整 或 已储存到commitLog但未存储到consumequeue和indexFile
*/
result = result && this.messageStore.load();
if (result) {
// 创建netty远程服务,remotingServer和fastRemotingServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
/********************
* 创建各种执行器线程池*
********************/
// 处理发送消息的请求的线程池
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//处理拉取消息的请求的线程池
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
// 处理reply消息的请求的线程池(Reply模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程)
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
// 处理查询请求的线程池
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.queryThreadPoolQueue,
new ThreadFactoryImpl("QueryMessageThread_"));
// broker 管理线程池,作为默认处理器的线程池
this.adminBrokerExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
"AdminBrokerThread_"));
// 客户端管理器的线程池
this.clientManageExecutor = new ThreadPoolExecutor(
this.brokerConfig.getClientManageThreadPoolNums(),
this.brokerConfig.getClientManageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.clientManagerThreadPoolQueue,
new ThreadFactoryImpl("ClientManageThread_"));
// 心跳处理的线程池
this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getHeartbeatThreadPoolNums(),
this.brokerConfig.getHeartbeatThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_", true));
// 事务消息相关处理的线程池
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
//消费者管理的线程池
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
// 注册netty服务的消息处理器
this.registerProcessor();
/*********************
* 启动各种定时任务 *
*********************/
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
// 每隔24h打印昨天生产和消费的消息数量
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//每隔10s將消费过滤信息进行持久化,存入consumerFilter.json文件中
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
/**
* 每3分钟检查消费进度,若消费进度落后超过 consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16\
* 且disableConsumeIfConsumerReadSlowly=true(默认false)则剔除掉该订阅组,该消费者组停止消费消息来保护broker
* 因为存储消息的commitLog一个文件大小才为1024L * 1024 * 1024。
* ps:不清楚原因
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
//每隔1s將打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及队列头部元素存在时间
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
//每隔1min將打印已存储在commitlog提交日志中但尚未分派到consumequeue消费队列的字节数。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
// 更新NamesrvAddr
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
/**
* 如果未指定nameSvr地址,且开启从地址服务器获取
* 则每2min从nameSvr地址服务器获取最新的地址并更新
* 若希望动态更新nameSvr地址,则需要指定地址服务器url和fetchNamesrvAddrByAddressServer设置为true
*/
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//如果没有开启DLeger服务,DLeger开启后表示支持高可用的主从自动切换
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
// 如果是从节点,更新master地址
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
//如果是主节点,每隔60s將打印主从节点的差异
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
// ....省略
// 初始化事务消息相关服务
initialTransaction();
// 初始化权限相关服务
initialAcl();
// 初始化RPC调用的钩子函数
initialRpcHooks();
}
return result;
}
入口: org.apache.rocketmq.broker.BrokerStartup#start
public static BrokerController start(BrokerController controller) {
try {
// BrokerController启动
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+ controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
public void start() throws Exception {
//启动消息存储服务
if (this.messageStore != null) {
this.messageStore.start();
}
//启动netty远程服务
if (this.remotingServer != null) {
this.remotingServer.start();
}
//启动netty远程服务VIP通道
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
//长轮询拉取消息挂起服务启动
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
//客户端连接心跳服务启动
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
// 如果没有开启DLeger
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//如果不是SLAVE,那么启动事务消息检查服务
startProcessorByHa(messageStoreConfig.getBrokerRole());
//如果是SLAVE,则启动主从同步服务, 定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法
//同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
this.registerBrokerAll(true, false, true);
}
// 定时任务,默认每30s向namesvr发起一次注册,即心跳包。时间间隔可配置registerNameServerPeriod,1万到6万毫秒间。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}