首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

RocketMQ 生产者 Producer 启动过程(5)

RocketMQ 生产者 Producer 启动过程(5)

启动MQClientInstance 服务 (第8步)

    public void start() throws MQClientException {
     
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 1. 如果配置NameServer地址,则从默认服务器地址中获取(该地址不可改变)
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // 2. 启动 Netty 客户端服务
                    this.mQClientAPIImpl.start();
                    // 3. 设置多个定时任务
                    this.startScheduledTask();
                    // 4. 开启 pullMessageService 服务
                    this.pullMessageService.start();
                    // 5. 开启 rebalanceService 服务
                    this.rebalanceService.start();
                    // 6. 开启 发送消息服务
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

主要这几步骤操作。

    1、获取NameServer地址
    如果启动 Producer 时没有指定 NameServer,则程序会向一个Http地址发送请求来获取NameServer地址。通过这种方式可以动态的配置 NameServer。从而达到动态增加和删除NameServer服务。

    public static String getWSAddr() {
        String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
        String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
        String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
        if (wsDomainName.indexOf(":") > 0) {
            wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
        }
        return wsAddr;
    }

    2、启动 Netty 客户端服务
    3、调用startScheduledTask() 方法设置多个定时任务
    4、开启 pullMessageService 服务
    5、开启 rebalanceService 服务
    6、开启 发送消息服务
返回列表