RocketMQ 生产者 Producer 启动过程(5)
- UID
- 1066743
|
RocketMQ 生产者 Producer 启动过程(5)
启动MQClientInstance 服务 (第8步)
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1. 如果配置NameServer地址,则从默认服务器地址中获取(该地址不可改变)
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 2. 启动 Netty 客户端服务
this.mQClientAPIImpl.start();
// 3. 设置多个定时任务
this.startScheduledTask();
// 4. 开启 pullMessageService 服务
this.pullMessageService.start();
// 5. 开启 rebalanceService 服务
this.rebalanceService.start();
// 6. 开启 发送消息服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
主要这几步骤操作。
1、获取NameServer地址
如果启动 Producer 时没有指定 NameServer,则程序会向一个Http地址发送请求来获取NameServer地址。通过这种方式可以动态的配置 NameServer。从而达到动态增加和删除NameServer服务。
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
String wsDomainSubgroup = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
String wsAddr = "http://" + wsDomainName + ":8080/rocketmq/" + wsDomainSubgroup;
if (wsDomainName.indexOf(":") > 0) {
wsAddr = "http://" + wsDomainName + "/rocketmq/" + wsDomainSubgroup;
}
return wsAddr;
}
2、启动 Netty 客户端服务
3、调用startScheduledTask() 方法设置多个定时任务
4、开启 pullMessageService 服务
5、开启 rebalanceService 服务
6、开启 发送消息服务 |
|
|
|
|
|