Board logo

标题: RocketMQ 生产者 Producer 启动过程(2) [打印本页]

作者: look_w    时间: 2019-4-12 13:47     标题: RocketMQ 生产者 Producer 启动过程(2)

Producer 启动

    public void start() throws MQClientException {
        this.defaultMQProducerImpl.start();
    }
     
    public void start() throws MQClientException {
        this.start(true);
    }
     
    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            // 1. 只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer
            case CREATE_JUST:
                //2. 防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
                this.serviceState = ServiceState.START_FAILED;
                // 3. 检查 groupName 是否合法
                this.checkConfig();
     
                //4. 判断是否需要设置 InstanceName 。
                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                // 5. 构建 MQClientInstance 对象。
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 6. 将 DefaultMQProducerImpl 对象注册到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                // 7.以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                // 8. 启动 第五步创建的 MQClientInstance 实例。
                if (startFactory) {
                    mQClientFactory.start();
                }
     
                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                // 9. 设置DefaultMQProducerImpl的ServiceState为RUNNING
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        // 10.  向所有的 broker 发送心跳和上传 FilterClass
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }




欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0