broker是一个单独的服务,提供消息的转发和存储功能
启动流程分为两步,
public static void main(String[] args) {
start(createBrokerController(args));
}
public static BrokerController createBrokerController(String[] args) {
final BrokerConfig brokerConfig = new BrokerConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
final NettyClientConfig nettyClientConfig = new NettyClientConfig();
//broker默认禁用tls
nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
//监听10911端口
nettyServerConfig.setListenPort(10911);
final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
//slave获取内存消息的比率规定为30,默认为40
if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
}
if (commandLine.hasOption('c')) {
//-c指定配置文件
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
//开了一个用于集群同步监听的端口,10911+1=10912
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
//这个构造器里初始化了一堆属性,等后面用到了再提
final BrokerController controller = new BrokerController(
brokerConfig,
nettyServerConfig,
nettyClientConfig,
messageStoreConfig);
controller.getConfiguration().registerConfig(properties);
//1、初始化controller
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
return controller;
}
初始化方法主要做了这些事儿:
public boolean initialize() throws CloneNotSupportedException {
//1、从本地加载了几个json文件,用于恢复上次宕机之前的缓存数据,具体的路径在BrokerPathConfigHelper类中
//初始化已经持久化的主题
boolean result = this.topicConfigManager.load();
//初始化消息消费进度管理器
result = result && this.consumerOffsetManager.load();
//初始化订阅组
result = result && this.subscriptionGroupManager.load();
//初始化消息过滤器
result = result && this.consumerFilterManager.load();
//2、初始化消息存储,这里面的私有属性也比较多,后面讲
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
//3、初始化dledger主节点选举处理器,默认不用dledger
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//消息存储的插件,目前还不太清楚做什么用的,内部看是从brokerConfig里取的私有属性,但是为空,可能为了易扩展,先不关注
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
//DispatcherList你可以当成存储操作的前置调用链
//CommitLogDispatcherCalcBitMap是一个位图实现的布隆过滤器,默认不启用
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
//加载消息存储的本地配置
result = result && this.messageStore.load();
//初始化web容器,与namesrv相同
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
//又初始化了一个web容器,监听10911-2=10909端口
//等于broker监听两个端口,具体有什么区别后面遇到再说
NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
//4、这里创建了一堆线程池,后面不同的逻辑会交给不同的线程池执行
//发消息线程池,默认1个线程
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getSendMessageThreadPoolNums(),
this.brokerConfig.getSendMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.sendThreadPoolQueue,
new ThreadFactoryImpl("SendMessageThread_"));
//拉消息线程池,默认16+系统环境可用线程数*2,我的mac是6核12线程,线程池的线程数就是16+12*2=40
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getPullMessageThreadPoolNums(),
this.brokerConfig.getPullMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
//消息重试线程池,线程数同上
this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.replyThreadPoolQueue,
new ThreadFactoryImpl("ProcessReplyMessageThread_"));
//省略一堆线程池的初始化
//5、下面registerProcessor用来指定请求类型和线程池的关系
this.registerProcessor();
final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
//6、定时任务,每天0点打印昨天的推拉消息数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
//7、定时任务,每5毫秒将消息消费进度持久化
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//8、定时任务,每10毫秒将主题下的过滤数据持久化
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
//9、定时任务,每3分钟检查一次消息堆积数量,大于临界值就禁用对应消费者组,同时更新缓存的订阅组信息,默认不启用
//不懂为什么要禁用消费者组,这样不就没人消费了吗,可能禁用之后有别的处理
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);
//10、每秒钟打印推、拉、查询、事务队列中的线程数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);
//11、每分钟打印待分发的消息数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
if (this.brokerConfig.getNamesrvAddr() != null) {
//12、初始化namesrv地址
this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
//12、支持从http服务器动态获取namesrv地址,默认关闭
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//默认不启用dleger,进if
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
//13、是slave节点要更新master地址
if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
this.updateMasterHAServerAddrPeriodically = false;
} else {
this.updateMasterHAServerAddrPeriodically = true;
}
} else {
//13、是master就每分钟打印主从消息的差异
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
}
}
//14、tls校验,跟namesrv相同,省略
//15、初始化事务相关
initialTransaction();
//16、acl用户权限校验,默认不启用,开启的话会往remoteServer里注册一个rpcHook,在接收请求前做权限预校验
initialAcl();
//17、注册RpcHook
initialRpcHooks();
}
return result;
}
public static BrokerController start(BrokerController controller) {
controller.start();
return controller;
}
启动主要做的就是执行一些类的启动逻辑
包括消息在磁盘中的存储、清理、系统监控,空闲异常channel的清理、事务回查、namesrv注册、熔断
public void start() throws Exception {
//1、消息存储类,主要是启动定时刷盘、消息重推、系统监控、集群监控、延时消息推送、过期文件清理等
if (this.messageStore != null) {
this.messageStore.start();
}
//2、broker真正交互的端口
if (this.remotingServer != null) {
this.remotingServer.start();
}
//3、broker真正交互的端口2
if (this.fastRemotingServer != null) {
this.fastRemotingServer.start();
}
//4、文件监听
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
//5、初始化broker调用外部api的处理线程
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
//6、默认开启longPollingEnable,每5秒检测一次pullRequestTable中的请求
//当消费队列中有新的消息,或者请求超过规定的等待时间就直接执行
if (this.pullRequestHoldService != null) {
this.pullRequestHoldService.start();
}
//7、每10秒扫一次producer、consumer、filterServer中长时间没有心跳的Channel
if (this.clientHousekeepingService != null) {
this.clientHousekeepingService.start();
}
//8、每半分钟新启动一个filterServer,但是默认的filterServer的上限是0,相当于没啥作用
if (this.filterServerManager != null) {
this.filterServerManager.start();
}
//默认不启用dleger
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//9、每分钟进行一次事务消息回查,默认超过15次回查就丢弃,超过72小时就跳过
//根据事务特有固定的group、topic、tag从本地文件找到事务半消息,对符合条件的进行回查
//topic:RMQ_SYS_TRANS_HALF_TOPIC
//group:CID_RMQ_SYS_TRANS
//tag:d
startProcessorByHa(messageStoreConfig.getBrokerRole());
//10、如果是从节点,每10秒同步一次主题配置、消费进度、延迟消息进度、订阅组配置
//11、如果是从节点切换成主节点,那就等待逻辑运行完再取消future
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
//12、向namesrv注册broker
this.registerBrokerAll(true, false, true);
}
//13、每30秒向namesrv注册一次broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
if (this.brokerStatsManager != null) {
this.brokerStatsManager.start();
}
//每10毫秒检测一次写commitLog时加锁的时间,超过1秒就认为占用时间过长
//占用过长的话会对队列中的超时请求进行一次清理,返回系统繁忙
//发送队列200毫秒超时
//拉取队列5秒超时
//心跳31秒超时
//事务3秒超时
if (this.brokerFastFailure != null) {
this.brokerFastFailure.start();
}
}
public void start() throws Exception {
{
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
}
}
}
if (maxPhysicalPosInLogicQueue < 0) {
maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
}
//1、找到消息重推的偏移地址,并启动消息重推线程
this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
//里面的run方法只是每毫秒更新reputFromOffset
//duplicationEnable是false所以中间的逻辑都不走直接break返回了
this.reputMessageService.start();
while (true) {
if (dispatchBehindBytes() <= 0) {
break;
}
Thread.sleep(1000);
}
//从consumeQueueTable向commitLog的topicQueueTable同步
this.recoverTopicQueueTable();
}
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
//2、启动集群服务
this.haService.start();
//3、启动定时任务扫描延迟消息
this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
}
//4、启动定时任务,每秒将每个consumeQueue中的所有fileChannel中的内存数据都持久化到本地
//路径/home/rocketmq/store/comsumeQueue/主题/队列/20位的文件名
this.flushConsumeQueueService.start();
//5、默认异步刷盘,启动CommitRealTimeService
//路径/home/rocketmq/store/commitLog/20位的文件名
this.commitLog.start();
//6、记录并打印一些跟tps有关的数据
this.storeStatsService.start();
//7、创建一个临时文件/home/rocketmq/store/abort
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
public void start() throws Exception {
//acceptSocketService在创建DefaultMessageStore时初始化,监听10912端口,是集群监听端口
//通过selector监听连接,并把集群连接缓存到connectionList中
this.acceptSocketService.beginAccept();
this.acceptSocketService.start();
//acceptSocketService在创建DefaultMessageStore时初始化,判断集群同步状态
this.groupTransferService.start();
//每5秒向master上报一次commitLog的记录位置
this.haClient.start();
}
private void addScheduleTask() {
//1、定时清理磁盘中CommitLog和ConsumeQueue的过期文件
//每0.1秒清理一次72小时未修改的CommitLog,每2分钟再尝试清理一次mappedFileQueue队首挂起的文件
//比mappedFileQueue队首文件的物理偏移量还要小的、在队列中的其他文件,将被直接清除,不懂为啥
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
//2、每10分钟检查映射的文件大小是否为正常的1G
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);
//3、每秒钟向${ROCKETMQ_HOME}/debug/lock/stack-时间戳1-时间戳2,这个文件打印活跃线程的堆栈信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {
String stack = UtilAll.jstack();
final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
+ DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
} catch (Exception e) {
}
}
}
}, 1, 1, TimeUnit.SECONDS);
//4、每10秒钟检测${ROCKETMQ_HOME}/store/commitLog占用的磁盘空间百分比,超过90%会报警
this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
}
}, 1000L, 10000L, TimeUnit.MILLISECONDS);
}