当前位置: 首页 > 工具软件 > ObjectBroker > 使用案例 >

rocketmq——Broker

郎祯
2023-12-01

rocketmq——Broker


1、作用

broker是一个单独的服务,提供消息的转发和存储功能

2、启动流程

启动流程分为两步,

public static void main(String[] args) {
        start(createBrokerController(args));
    }

2.1、创建BrokerController

public static BrokerController createBrokerController(String[] args) {

       final BrokerConfig brokerConfig = new BrokerConfig();
       final NettyServerConfig nettyServerConfig = new NettyServerConfig();
       final NettyClientConfig nettyClientConfig = new NettyClientConfig();
				//broker默认禁用tls
       nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
           String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
  			//监听10911端口
       nettyServerConfig.setListenPort(10911);
       final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
				//slave获取内存消息的比率规定为30,默认为40
       if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
           int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
           messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
       }

       if (commandLine.hasOption('c')) {
           //-c指定配置文件
       }

       MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
				//开了一个用于集群同步监听的端口,10911+1=10912
       messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
				//这个构造器里初始化了一堆属性,等后面用到了再提
       final BrokerController controller = new BrokerController(
           brokerConfig,
           nettyServerConfig,
           nettyClientConfig,
           messageStoreConfig);
       controller.getConfiguration().registerConfig(properties);
				//1、初始化controller
       boolean initResult = controller.initialize();
       if (!initResult) {
           controller.shutdown();
           System.exit(-3);
       }

       return controller;
    }

2.1.1、初始化——initialize

初始化方法主要做了这些事儿:

  • 恢复缓存数据,包括主题、消息消费进度、订阅组、消息过滤
  • 初始化web容器
  • 初始化收发消息、重试消息等多个不同用途的线程池,并关联接收到的请求类型
  • 初始化了一堆定时器,打印每日推拉消息数、消息堆积数,持久化主题和消费进度
  • 初始化事务、acl、rpcHook
public boolean initialize() throws CloneNotSupportedException {
  			//1、从本地加载了几个json文件,用于恢复上次宕机之前的缓存数据,具体的路径在BrokerPathConfigHelper类中
  			//初始化已经持久化的主题
        boolean result = this.topicConfigManager.load();
  			//初始化消息消费进度管理器
        result = result && this.consumerOffsetManager.load();
  			//初始化订阅组
        result = result && this.subscriptionGroupManager.load();
  			//初始化消息过滤器
        result = result && this.consumerFilterManager.load();
  			//2、初始化消息存储,这里面的私有属性也比较多,后面讲
        this.messageStore =
            new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                this.brokerConfig);
			  //3、初始化dledger主节点选举处理器,默认不用dledger
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
            ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
        }
        this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
        //消息存储的插件,目前还不太清楚做什么用的,内部看是从brokerConfig里取的私有属性,但是为空,可能为了易扩展,先不关注
        MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
        this.messageStore = MessageStoreFactory.build(context, this.messageStore);
  			//DispatcherList你可以当成存储操作的前置调用链
  			//CommitLogDispatcherCalcBitMap是一个位图实现的布隆过滤器,默认不启用
        this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
				//加载消息存储的本地配置
        result = result && this.messageStore.load();
				//初始化web容器,与namesrv相同
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
  			//又初始化了一个web容器,监听10911-2=10909端口
  			//等于broker监听两个端口,具体有什么区别后面遇到再说
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
  			//4、这里创建了一堆线程池,后面不同的逻辑会交给不同的线程池执行
  			//发消息线程池,默认1个线程
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));
				//拉消息线程池,默认16+系统环境可用线程数*2,我的mac是6核12线程,线程池的线程数就是16+12*2=40
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));
				//消息重试线程池,线程数同上
        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.replyThreadPoolQueue,
            new ThreadFactoryImpl("ProcessReplyMessageThread_"));
  			//省略一堆线程池的初始化
				//5、下面registerProcessor用来指定请求类型和线程池的关系
        this.registerProcessor();

        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = 1000 * 60 * 60 * 24;
  			//6、定时任务,每天0点打印昨天的推拉消息数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);
				//7、定时任务,每5毫秒将消息消费进度持久化
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
				//8、定时任务,每10毫秒将主题下的过滤数据持久化
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerFilterManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumer filter error.", e);
                }
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
				//9、定时任务,每3分钟检查一次消息堆积数量,大于临界值就禁用对应消费者组,同时更新缓存的订阅组信息,默认不启用
  			//不懂为什么要禁用消费者组,这样不就没人消费了吗,可能禁用之后有别的处理
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.protectBroker();
                } catch (Throwable e) {
                    log.error("protectBroker error.", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);
				//10、每秒钟打印推、拉、查询、事务队列中的线程数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.printWaterMark();
                } catch (Throwable e) {
                    log.error("printWaterMark error.", e);
                }
            }
        }, 10, 1, TimeUnit.SECONDS);
				//11、每分钟打印待分发的消息数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    log.error("schedule dispatchBehindBytes error.", e);
                }
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

        if (this.brokerConfig.getNamesrvAddr() != null) {
          	//12、初始化namesrv地址
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
          	//12、支持从http服务器动态获取namesrv地址,默认关闭
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
				//默认不启用dleger,进if
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
              	//13、是slave节点要更新master地址
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
            } else {
              	//13、是master就每分钟打印主从消息的差异
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }
  			//14、tls校验,跟namesrv相同,省略
  			//15、初始化事务相关
        initialTransaction();
  			//16、acl用户权限校验,默认不启用,开启的话会往remoteServer里注册一个rpcHook,在接收请求前做权限预校验
        initialAcl();
  			//17、注册RpcHook
        initialRpcHooks();
        }
        return result;
    }

2.2、启动——start

public static BrokerController start(BrokerController controller) {
            controller.start();
            return controller;
    }

启动主要做的就是执行一些类的启动逻辑

包括消息在磁盘中的存储、清理、系统监控,空闲异常channel的清理、事务回查、namesrv注册、熔断

public void start() throws Exception {
  			//1、消息存储类,主要是启动定时刷盘、消息重推、系统监控、集群监控、延时消息推送、过期文件清理等
        if (this.messageStore != null) {
            this.messageStore.start();
        }
				//2、broker真正交互的端口
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
				//3、broker真正交互的端口2
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
				//4、文件监听
        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
				//5、初始化broker调用外部api的处理线程
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
				//6、默认开启longPollingEnable,每5秒检测一次pullRequestTable中的请求
  			//当消费队列中有新的消息,或者请求超过规定的等待时间就直接执行
        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
				//7、每10秒扫一次producer、consumer、filterServer中长时间没有心跳的Channel
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
				//8、每半分钟新启动一个filterServer,但是默认的filterServer的上限是0,相当于没啥作用
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
				//默认不启用dleger
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
          	//9、每分钟进行一次事务消息回查,默认超过15次回查就丢弃,超过72小时就跳过
          	//根据事务特有固定的group、topic、tag从本地文件找到事务半消息,对符合条件的进行回查
          	//topic:RMQ_SYS_TRANS_HALF_TOPIC
          	//group:CID_RMQ_SYS_TRANS
          	//tag:d
            startProcessorByHa(messageStoreConfig.getBrokerRole());
          	//10、如果是从节点,每10秒同步一次主题配置、消费进度、延迟消息进度、订阅组配置
          	//11、如果是从节点切换成主节点,那就等待逻辑运行完再取消future
            handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
          	//12、向namesrv注册broker
            this.registerBrokerAll(true, false, true);
        }
				//13、每30秒向namesrv注册一次broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
	
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
				//每10毫秒检测一次写commitLog时加锁的时间,超过1秒就认为占用时间过长
  			//占用过长的话会对队列中的超时请求进行一次清理,返回系统繁忙
  			//发送队列200毫秒超时
  			//拉取队列5秒超时
  			//心跳31秒超时
  			//事务3秒超时
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }


    }

2.2.1、消息存储类——messageStore

public void start() throws Exception {
        {
            long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
            for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
                for (ConsumeQueue logic : maps.values()) {
                    if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
                        maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
                    }
                }
            }
            if (maxPhysicalPosInLogicQueue < 0) {
                maxPhysicalPosInLogicQueue = 0;
            }
            if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
                maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
            }
          	//1、找到消息重推的偏移地址,并启动消息重推线程
            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);
          	//里面的run方法只是每毫秒更新reputFromOffset
          	//duplicationEnable是false所以中间的逻辑都不走直接break返回了
            this.reputMessageService.start();

            while (true) {
                if (dispatchBehindBytes() <= 0) {
                    break;
                }
                Thread.sleep(1000);
            }
          	//从consumeQueueTable向commitLog的topicQueueTable同步
            this.recoverTopicQueueTable();
        }

        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
          	//2、启动集群服务
            this.haService.start();
          	//3、启动定时任务扫描延迟消息
            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
        }
				//4、启动定时任务,每秒将每个consumeQueue中的所有fileChannel中的内存数据都持久化到本地
  			//路径/home/rocketmq/store/comsumeQueue/主题/队列/20位的文件名
        this.flushConsumeQueueService.start();
  			//5、默认异步刷盘,启动CommitRealTimeService
 			  //路径/home/rocketmq/store/commitLog/20位的文件名
        this.commitLog.start();
  			//6、记录并打印一些跟tps有关的数据
        this.storeStatsService.start();
				//7、创建一个临时文件/home/rocketmq/store/abort
        this.createTempFile();
        this.addScheduleTask();
        this.shutdown = false;
    }
2.2.1.1、启动集群服务——haService.start()
public void start() throws Exception {
  			//acceptSocketService在创建DefaultMessageStore时初始化,监听10912端口,是集群监听端口
  			//通过selector监听连接,并把集群连接缓存到connectionList中
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
  			//acceptSocketService在创建DefaultMessageStore时初始化,判断集群同步状态
        this.groupTransferService.start();
  			//每5秒向master上报一次commitLog的记录位置
        this.haClient.start();
    }
2.2.1.2、定时任务——addScheduleTask
private void addScheduleTask() {
				//1、定时清理磁盘中CommitLog和ConsumeQueue的过期文件
  			//每0.1秒清理一次72小时未修改的CommitLog,每2分钟再尝试清理一次mappedFileQueue队首挂起的文件
  			//比mappedFileQueue队首文件的物理偏移量还要小的、在队列中的其他文件,将被直接清除,不懂为啥
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
				//2、每10分钟检查映射的文件大小是否为正常的1G
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.checkSelf();
            }
        }, 1, 10, TimeUnit.MINUTES);
				//3、每秒钟向${ROCKETMQ_HOME}/debug/lock/stack-时间戳1-时间戳2,这个文件打印活跃线程的堆栈信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
                    try {
                        if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
                            long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
                            if (lockTime > 1000 && lockTime < 10000000) {

                                String stack = UtilAll.jstack();
                                final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-"
                                    + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
                                MixAll.string2FileNotSafe(stack, fileName);
                            }
                        }
                    } catch (Exception e) {
                    }
                }
            }
        }, 1, 1, TimeUnit.SECONDS);
				//4、每10秒钟检测${ROCKETMQ_HOME}/store/commitLog占用的磁盘空间百分比,超过90%会报警
        this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            public void run() {
                DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
            }
        }, 1000L, 10000L, TimeUnit.MILLISECONDS);
    }
 类似资料: