RocketMQ 生产者 Producer 启动过程(4)
- UID
- 1066743
|
RocketMQ 生产者 Producer 启动过程(4)
创建MQClientInstance实例(第5.2步)
上面 5.2 步骤中创建MQClientInstance 的代码如下:
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// Netty 中注册接收请求的处理器。
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
//设置 NameServer 地址。
if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}
// 客户端ID
this.clientId = clientId;
//创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
this.mQAdminImpl = new MQAdminImpl(this);
// 创建 pullMessageService 服务
this.pullMessageService = new PullMessageService(this);
// 创建 rebalanceService 服务
this.rebalanceService = new RebalanceService(this);
// 创建 DefaultMQProducer 服务
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// 开启 Comsumer 统计服务
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
主要功能:
创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
创建 pullMessageService 服务
创建 rebalanceService 服务,供 Consumer 端使用
创建 DefaultMQProducer 服务,
开启 Comsumer 统计服务。统计最近一段时间内,消费成功个数、消费失败个数等信息。 |
|
|
|
|
|