首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

RocketMQ 生产者 Producer 启动过程(6)

RocketMQ 生产者 Producer 启动过程(6)

startScheduledTask() 方法:定时任务

    private void startScheduledTask() {
        // 1.如果 NameServer 地址默认没配置,则定时向一个Http地址获取
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }
     
        // 2. 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
     
        // 3. 定时清理无效的Broker,并向所有的Broker 发送心跳数据
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
     
        // 4. 定时的持久化 Consumer 端消费每个 queue的 offset 数据。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
     
        // 5. 调整消费端的线程数
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
    }

    1、定时更新 NameServer 地址
    每个2分钟,程序会向一个Http地址发送请求来获取NameServer地址来动态更新NameServer地址。

    2、 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
    默认每隔 30秒去 NameServer 中获取Topic、broker、queue等相关信息。
    如果有新broker注册或下线,producer端会在30秒之内感知。

    3、定时清理无效的Broker,并向所有的Broker 发送心跳数据.
    默认每隔 30 秒向 Broker 发送心跳数据 和 用户自定义的 filterclass 类。

    4、定时的持久化 Consumer 端消费每个 queue的 offset 数据。
    默认每隔 5 秒持久或 Consumer 消费的 queue 的 offset信息。
    持久化分为,远程持久化和本地持久化。
    MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上。
    BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。

    5、调整消费端的线程数
    每隔 1 分钟计算每一个queue中消息挤压的数量,如果超过100000条,则增加消费线程的并发数,如果小于80000条则减少消费者的线程数。
    不过进入源码中看,调整消费者的线程数都注释掉了。
返回列表