首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

RocketMQ 生产者 Producer 启动过程(4)

RocketMQ 生产者 Producer 启动过程(4)

创建MQClientInstance实例(第5.2步)

上面 5.2 步骤中创建MQClientInstance 的代码如下:

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        // Netty 中注册接收请求的处理器。
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
        //设置 NameServer 地址。
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
        }
        // 客户端ID
        this.clientId = clientId;
        //创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
        this.mQAdminImpl = new MQAdminImpl(this);
        // 创建 pullMessageService 服务
        this.pullMessageService = new PullMessageService(this);
        // 创建 rebalanceService  服务
        this.rebalanceService = new RebalanceService(this);
        // 创建 DefaultMQProducer 服务
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);
        // 开启 Comsumer 统计服务
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
     
        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    }

主要功能:

    创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
    创建 pullMessageService 服务
    创建 rebalanceService 服务,供 Consumer 端使用
    创建 DefaultMQProducer 服务,
    开启 Comsumer 统计服务。统计最近一段时间内,消费成功个数、消费失败个数等信息。
返回列表