RocketMQ 生产者 Producer 启动过程(2)
- UID
- 1066743
|
RocketMQ 生产者 Producer 启动过程(2)
Producer 启动
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
// 1. 只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer
case CREATE_JUST:
//2. 防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
this.serviceState = ServiceState.START_FAILED;
// 3. 检查 groupName 是否合法
this.checkConfig();
//4. 判断是否需要设置 InstanceName 。
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 5. 构建 MQClientInstance 对象。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 6. 将 DefaultMQProducerImpl 对象注册到 ConcurrentHashMap<String/* group */, MQProducerInner> producerTable 中
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 7.以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// 8. 启动 第五步创建的 MQClientInstance 实例。
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
// 9. 设置DefaultMQProducerImpl的ServiceState为RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 10. 向所有的 broker 发送心跳和上传 FilterClass
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
} |
|
|
|
|
|