RocketMQ 生产者 Producer 启动过程(3)
- UID
- 1066743
|
RocketMQ 生产者 Producer 启动过程(3)
启动Producer的时候判断 serviceState 的当前状态,只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer。否则抛出异常信息。
2、同时防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。
3、 检查 groupName 是否合法。比如不能为空,是否符合正则 ^[%|a-zA-Z0-9_-]+$,并且最大长度不能超过 255(CHARACTER_MAX_LENGTH = 255);
groupName 也不能等于 DEFAULT_PRODUCER。只要满足上面条件,则抛异常信息。
4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后调用 changeInstanceNameToPID() 方法判断名字不是 "DEFAULT" 则更改 instanceName。
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
.....
}
5、构建 MQClientInstance 对象。
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
5.2 如果 factoryTable 中是不已经存在 MQClientInstance 实例,则创建。 (下面有单独分析该源码)
6、将 DefaultMQProducerImpl 对象注册到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();中
private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中
8、调用 第五步创建的 MQClientInstance 实例 的start() 方法。
该方法做了很多事情:
获取NameServer地址
启动 Netty 客户端服务
设置多个定时任务
开启 pullMessageService 服务
开启 rebalanceService 服务
开启 发送消息服务
下面有具体代码分析MQClientInstance.start() 方法。
9、设置DefaultMQProducerImpl的ServiceState为RUNNING
10、向所有的 broker 发送心跳和上传 FilterClass |
|
|
|
|
|