当前位置: 首页 > 工具软件 > ObjectBroker > 使用案例 >

【RocketMQ】源码详解:Broker启动流程

萧成文
2023-12-01

Broker启动

入口: org.apache.rocketmq.broker.BrokerStartup#main

broker的启动主要分为两部分:1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是,这里的BrokerController相当于Broker的一个中央控制器类,并不是编写http接口的类

  1. 创建brokerController:设置一些属性参数,读取外部配置文件,实例化brokerController并调用其初始化方法。

    1. 实例化brokerController:主要是会实例化很多的配置类和线程池的阻塞队列。
    2. 初始化方法 会加载如消费者消费进度等的文件,会实例化消息存储的服务类,并调用其加载方法,加载储存消息的相关文件到内存中,并进行数据恢复。还会创建各类线程池并注册netty服务的消息处理器,之后创建各种定时任务,如:持久化消费者消费进度的任务、定时打印各种日志的任务等。
  2. 启动brokerController: 内部会启动消息储存服务、netty服务、长轮询拉取消息挂起服务、向nameserver心跳定时任务等

public static void main(String[] args) {
    // broker启动
    start(createBrokerController(args));
}

创建brokerController

创建brokerController:org.apache.rocketmq.broker.BrokerStartup#createBrokerController

public static BrokerController createBrokerController(String[] args) {
    // 设置属性rocketmq.remoting.version,即当前rocketmq版本
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
        NettySystemConfig.socketSndbufSize = 131072;
    }

    if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
        NettySystemConfig.socketRcvbufSize = 131072;
    }

    try {
        //PackageConflictDetect.detectFastjson();
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
            new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
        }

        // 创建Broker的配置类,包含Broker的各种配置,比如 ROCKETMQ_HOME
        final BrokerConfig brokerConfig = new BrokerConfig();
        // NettyServer的配置类,Broker接收来自客户端的消息的时候作为服务端
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        // NettyClient的配置类,Broker连接NameServer的时候还会作为客户端
        final NettyClientConfig nettyClientConfig = new NettyClientConfig();

        nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
            String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
        // 设置作为NettyServer时的监听端口为10911
        nettyServerConfig.setListenPort(10911);
        // Broker的消息存储配置,例如各种文件大小等
        final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

        if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
            int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
            messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
        }
        /*
         * 判断命令行启动是否包含 -c 命令,用于指定配置文件
         * 如果包含,则解析指定的配置文件
         * 启动Broker的时候使用命令参数 -c /xxx/rocketmq/config/conf/broker.conf
         */
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                configFile = file;
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);

                properties2SystemEnv(properties);
                MixAll.properties2Object(properties, brokerConfig);
                MixAll.properties2Object(properties, nettyServerConfig);
                MixAll.properties2Object(properties, nettyClientConfig);
                MixAll.properties2Object(properties, messageStoreConfig);

                BrokerPathConfigHelper.setBrokerConfigPath(file);
                in.close();
            }
        }

        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

        if (null == brokerConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }
        // 获取namesrv地址
        String namesrvAddr = brokerConfig.getNamesrvAddr();
        if (null != namesrvAddr) {
            try {
                // 可以指定多个地址,以 ; 隔开,这里进行拆分
                String[] addrArray = namesrvAddr.split(";");
                for (String addr : addrArray) {
                    RemotingUtil.string2SocketAddress(addr);
                }
            } catch (Exception e) {
                System.out.printf(
                    "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
                    namesrvAddr);
                System.exit(-3);
            }
        }
        // 设置、校验brokerId,BrokerId为0表示Master,非0表示Slave
        switch (messageStoreConfig.getBrokerRole()) {
            case ASYNC_MASTER:
            case SYNC_MASTER:
                brokerConfig.setBrokerId(MixAll.MASTER_ID);
                break;
            case SLAVE:
                if (brokerConfig.getBrokerId() <= 0) {
                    System.out.printf("Slave's brokerId must be > 0");
                    System.exit(-3);
                }

                break;
            default:
                break;
        }

        // 是否开启DLeger,即多副本(主从切换集群)
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            brokerConfig.setBrokerId(-1);
        }
        // 设置高可用通信监听端口,为监听端口+1,默认就是10912
        // 该端口主要用于 如 主从同步之类的高可用操作
        messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");

        // 判断命令行中是否包含字符'p'(printConfigItem)和'm',如果存在则打印配置信息并结束jvm运行,没有的话就不用管
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            MixAll.printObjectProperties(console, nettyClientConfig);
            MixAll.printObjectProperties(console, messageStoreConfig);
            System.exit(0);
        } else if (commandLine.hasOption('m')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
            MixAll.printObjectProperties(console, brokerConfig, true);
            MixAll.printObjectProperties(console, nettyServerConfig, true);
            MixAll.printObjectProperties(console, nettyClientConfig, true);
            MixAll.printObjectProperties(console, messageStoreConfig, true);
            System.exit(0);
        }
        // 打印当前broker的配置日志
        log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
        MixAll.printObjectProperties(log, brokerConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);
        MixAll.printObjectProperties(log, nettyClientConfig);
        MixAll.printObjectProperties(log, messageStoreConfig);

        // 实例化 BrokerController,内部主要初始化了一些配置类、manager类、处理器、线程池等
        final BrokerController controller = new BrokerController(
            brokerConfig,
            nettyServerConfig,
            nettyClientConfig,
            messageStoreConfig);
        // remember all configs to prevent discard
        // 将所有的-c的外部配置信息保存到BrokerController中的Configuration对象属性的allConfigs属性中
        controller.getConfiguration().registerConfig(properties);

        // 初始化BrokerController
        boolean initResult = controller.initialize();
        if (!initResult) {
            controller.shutdown();
            System.exit(-3);
        }
        // 添加关闭钩子方法,在Broker关闭之前执行,进行一些内存清理、对象销毁等操作
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);

            @Override
            public void run() {
                synchronized (this) {
                    log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                    if (!this.hasShutdown) {
                        this.hasShutdown = true;
                        long beginTime = System.currentTimeMillis();
                        controller.shutdown();
                        long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                        log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                    }
                }
            }
        }, "ShutdownHook"));

        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}

实例化brokerController

实例化brokerController: org.apache.rocketmq.broker.BrokerController#BrokerController

public BrokerController(
    final BrokerConfig brokerConfig,
    final NettyServerConfig nettyServerConfig,
    final NettyClientConfig nettyClientConfig,
    final MessageStoreConfig messageStoreConfig
) {
    // broker的配置
    this.brokerConfig = brokerConfig;
    // 作为netty服务端与客户端交互的配置
    this.nettyServerConfig = nettyServerConfig;
    // 作为netty客户端与服务端交互的配置
    this.nettyClientConfig = nettyClientConfig;
    // 消息存储的配置
    this.messageStoreConfig = messageStoreConfig;
    // 消费者偏移量管理器,维护offset进度信息
    this.consumerOffsetManager = new ConsumerOffsetManager(this);
    //topic配置管理器,管理broker中存储的所有topic的配置
    this.topicConfigManager = new TopicConfigManager(this);
    // 处理消费者拉取消息请求的处理器
    this.pullMessageProcessor = new PullMessageProcessor(this);
    //拉取请求挂起服务,处理无消息时push长轮询消费者的挂起等待机制
    this.pullRequestHoldService = new PullRequestHoldService(this);
    //消息送达的监听器,生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldService
    this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
    //消费者id变化监听器
    this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
    //消费者管理类,维护消费者组的注册实例信息以及topic的订阅信息,并对消费者id变化进行监听
    this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
    //消费者过滤管理器,配置文件为:xx/config/consumerFilter.json
    this.consumerFilterManager = new ConsumerFilterManager(this);
    //生产者管理器,包含生产者的注册信息,通过groupName分组
    this.producerManager = new ProducerManager();
    //客户端连接心跳服务,用于定时扫描生产者和消费者客户端,并将不活跃的客户端通道及相关信息移除
    this.clientHousekeepingService = new ClientHousekeepingService(this);
    //处理某些broker到客户端的请求,例如检查生产者的事务状态,重置offset
    this.broker2Client = new Broker2Client(this);
    //订阅分组关系管理器,维护消费者组的一些附加运维信息
    this.subscriptionGroupManager = new SubscriptionGroupManager(this);
    //broker对方访问的API,处理broker对外的发起请求,比如向nameServer注册,向master、slave发起的请求
    this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
    //过滤服务管理器,拉取消息过滤
    this.filterServerManager = new FilterServerManager(this);

    //用于从节点,定时向主节点发起请求同步数据,例如topic配置、消费位移等
    this.slaveSynchronize = new SlaveSynchronize(this);

    /*初始化各种阻塞队列。将会被设置到对应的处理不同客户端请求的线程池执行器中*/
    //处理来自生产者的发送消息的请求的队列
    this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
    this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
    //处理reply消息的请求的队列,RocketMQ4.7.0版本中增加了request-reply新特性,该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息,类似rpc调用效果。
    //即生产者发送了消息之后,可以同步或者异步的收到消费了这条消息的消费者的响应
    this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
    //处理查询请求的队列
    this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
    //客户端管理器的队列
    this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
    //消费者管理器的队列,目前没用到
    this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
    //心跳处理的队列
    this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
    //事务消息相关处理的队列
    this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
    //broker状态管理器,保存Broker运行时状态
    this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
    //目前没用到
    this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
    //broker快速失败服务
    this.brokerFastFailure = new BrokerFastFailure(this);
    //配置类
    this.configuration = new Configuration(
        log,
        BrokerPathConfigHelper.getBrokerConfigPath(),
        this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
    );
}

初始化brokerController

初始化brokerController: org.apache.rocketmq.broker.BrokerController#initialize

public boolean initialize() throws CloneNotSupportedException {
    //topic配置文件加载,路径为  {user.home}/store/config/topics.json
    boolean result = this.topicConfigManager.load();

    // 消费者消费偏移量配置文件加载,路径为  {user.home}/store/config/consumerOffset.json
    result = result && this.consumerOffsetManager.load();
    // 订阅分组配置文件加载,路径为  {user.home}/store/config/subscriptionGroup.json
    result = result && this.subscriptionGroupManager.load();
    // 消费者过滤配置文件加载,路径为  {user.home}/store/config/consumerFilter.json
    result = result && this.consumerFilterManager.load();

    if (result) {
        // 实例化和初始化消息存储服务相关类 DefaultMessageStore
        try {
            // 实例化消息存储类DefaultMessageStore
            this.messageStore =
                new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                    this.brokerConfig);

            /**
             * enableDLegerCommitLog 为 true(默认为false),则创建DLedgerRoleChangeHandler。
             * 在启用enableDLegerCommitLog情况下,broker通过raft协议选主,可以实现主从角色自动切换
             */
            if (messageStoreConfig.isEnableDLegerCommitLog()) {
                DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
                ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
            }
            this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
            //load plugin
            MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
            this.messageStore = MessageStoreFactory.build(context, this.messageStore);
            this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
        } catch (IOException e) {
            result = false;
            log.error("Failed to initialize", e);
        }
    }
    /*
     * 通过消息存储服务 加载 储存消息的相关文件到内存中,并进行数据恢复(此步骤是broker启动的核心步骤)
     * 文件:commitLog文件(储存消息的)、consumequeue文件(消费者依据此文件消费,储存指向commitLog中消息的偏移量)、indexFile文件
     * 数据恢复:broker可能会异常关闭,导致消息在文件中储存不完整 或 已储存到commitLog但未存储到consumequeue和indexFile
     */
    result = result && this.messageStore.load();

    if (result) {
        // 创建netty远程服务,remotingServer和fastRemotingServer
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
        NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
        fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
        this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);

        /********************
         * 创建各种执行器线程池*
         ********************/

        // 处理发送消息的请求的线程池
        this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getSendMessageThreadPoolNums(),
            this.brokerConfig.getSendMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.sendThreadPoolQueue,
            new ThreadFactoryImpl("SendMessageThread_"));
        //处理拉取消息的请求的线程池
        this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getPullMessageThreadPoolNums(),
            this.brokerConfig.getPullMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.pullThreadPoolQueue,
            new ThreadFactoryImpl("PullMessageThread_"));
        // 处理reply消息的请求的线程池(Reply模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程)
        this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.replyThreadPoolQueue,
            new ThreadFactoryImpl("ProcessReplyMessageThread_"));
        // 处理查询请求的线程池
        this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            this.brokerConfig.getQueryMessageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.queryThreadPoolQueue,
            new ThreadFactoryImpl("QueryMessageThread_"));
        // broker 管理线程池,作为默认处理器的线程池
        this.adminBrokerExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                "AdminBrokerThread_"));
        // 客户端管理器的线程池
        this.clientManageExecutor = new ThreadPoolExecutor(
            this.brokerConfig.getClientManageThreadPoolNums(),
            this.brokerConfig.getClientManageThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.clientManagerThreadPoolQueue,
            new ThreadFactoryImpl("ClientManageThread_"));

        // 心跳处理的线程池
        this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            this.brokerConfig.getHeartbeatThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.heartbeatThreadPoolQueue,
            new ThreadFactoryImpl("HeartbeatThread_", true));

        // 事务消息相关处理的线程池
        this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            this.brokerConfig.getEndTransactionThreadPoolNums(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.endTransactionThreadPoolQueue,
            new ThreadFactoryImpl("EndTransactionThread_"));

        //消费者管理的线程池
        this.consumerManageExecutor =
            Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                "ConsumerManageThread_"));

        // 注册netty服务的消息处理器
        this.registerProcessor();

        /*********************
         * 启动各种定时任务     *
         *********************/
        final long initialDelay = UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();
        final long period = 1000 * 60 * 60 * 24;
        // 每隔24h打印昨天生产和消费的消息数量
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }
        }, initialDelay, period, TimeUnit.MILLISECONDS);

        //每隔5s將消费者offset进行持久化,存入consumerOffset.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerOffsetManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumerOffset error.", e);
                }
            }
        }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        //每隔10s將消费过滤信息进行持久化,存入consumerFilter.json文件中
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.consumerFilterManager.persist();
                } catch (Throwable e) {
                    log.error("schedule persist consumer filter error.", e);
                }
            }
        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

        /**
         * 每3分钟检查消费进度,若消费进度落后超过 consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16\
         * 且disableConsumeIfConsumerReadSlowly=true(默认false)则剔除掉该订阅组,该消费者组停止消费消息来保护broker
         * 因为存储消息的commitLog一个文件大小才为1024L * 1024 * 1024。
         * ps:不清楚原因
         */
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.protectBroker();
                } catch (Throwable e) {
                    log.error("protectBroker error.", e);
                }
            }
        }, 3, 3, TimeUnit.MINUTES);

        //每隔1s將打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及队列头部元素存在时间
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    BrokerController.this.printWaterMark();
                } catch (Throwable e) {
                    log.error("printWaterMark error.", e);
                }
            }
        }, 10, 1, TimeUnit.SECONDS);

        //每隔1min將打印已存储在commitlog提交日志中但尚未分派到consumequeue消费队列的字节数。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                } catch (Throwable e) {
                    log.error("schedule dispatchBehindBytes error.", e);
                }
            }
        }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

        if (this.brokerConfig.getNamesrvAddr() != null) {
            // 更新NamesrvAddr
            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
            log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
        } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
            /**
             * 如果未指定nameSvr地址,且开启从地址服务器获取
             * 则每2min从nameSvr地址服务器获取最新的地址并更新
             * 若希望动态更新nameSvr地址,则需要指定地址服务器url和fetchNamesrvAddrByAddressServer设置为true
             */
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                    } catch (Throwable e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        //如果没有开启DLeger服务,DLeger开启后表示支持高可用的主从自动切换
        if (!messageStoreConfig.isEnableDLegerCommitLog()) {
            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                // 如果是从节点,更新master地址
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }
            } else {
                //如果是主节点,每隔60s將打印主从节点的差异
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }
		
        // ....省略
        
        // 初始化事务消息相关服务
        initialTransaction();
        // 初始化权限相关服务
        initialAcl();
        // 初始化RPC调用的钩子函数
        initialRpcHooks();
    }
    return result;
}

启动brokerController

入口: org.apache.rocketmq.broker.BrokerStartup#start

public static BrokerController start(BrokerController controller) {
    try {
        // BrokerController启动
        controller.start();

        String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
            + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();

        if (null != controller.getBrokerConfig().getNamesrvAddr()) {
            tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
        }

        log.info(tip);
        System.out.printf("%s%n", tip);
        return controller;
    } catch (Throwable e) {
        e.printStackTrace();
        System.exit(-1);
    }

    return null;
}
public void start() throws Exception {
    //启动消息存储服务
    if (this.messageStore != null) {
        this.messageStore.start();
    }

    //启动netty远程服务
    if (this.remotingServer != null) {
        this.remotingServer.start();
    }

    //启动netty远程服务VIP通道
    if (this.fastRemotingServer != null) {
        this.fastRemotingServer.start();
    }

    if (this.fileWatchService != null) {
        this.fileWatchService.start();
    }

    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }

    //长轮询拉取消息挂起服务启动
    if (this.pullRequestHoldService != null) {
        this.pullRequestHoldService.start();
    }

    //客户端连接心跳服务启动
    if (this.clientHousekeepingService != null) {
        this.clientHousekeepingService.start();
    }

    if (this.filterServerManager != null) {
        this.filterServerManager.start();
    }

    // 如果没有开启DLeger
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        //如果不是SLAVE,那么启动事务消息检查服务
        startProcessorByHa(messageStoreConfig.getBrokerRole());
        //如果是SLAVE,则启动主从同步服务, 定时任务每隔10s与master机器同步数据,采用slave主动拉取的方法
        //同步的内容包括topic配置,消费者消费位移、延迟消息偏移量、订阅组信息等
        handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
        this.registerBrokerAll(true, false, true);
    }

    // 定时任务,默认每30s向namesvr发起一次注册,即心跳包。时间间隔可配置registerNameServerPeriod,1万到6万毫秒间。
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

    if (this.brokerStatsManager != null) {
        this.brokerStatsManager.start();
    }

    if (this.brokerFastFailure != null) {
        this.brokerFastFailure.start();
    }


}
 类似资料: