Board logo

标题: RocketMQ 生产者 Producer 启动过程(4) [打印本页]

作者: look_w    时间: 2019-4-12 13:49     标题: RocketMQ 生产者 Producer 启动过程(4)

创建MQClientInstance实例(第5.2步)

上面 5.2 步骤中创建MQClientInstance 的代码如下:

    public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
        this.clientConfig = clientConfig;
        this.instanceIndex = instanceIndex;
        this.nettyClientConfig = new NettyClientConfig();
        this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
        this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
        this.clientRemotingProcessor = new ClientRemotingProcessor(this);
        // Netty 中注册接收请求的处理器。
        this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);
        //设置 NameServer 地址。
        if (this.clientConfig.getNamesrvAddr() != null) {
            this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
            log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
        }
        // 客户端ID
        this.clientId = clientId;
        //创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
        this.mQAdminImpl = new MQAdminImpl(this);
        // 创建 pullMessageService 服务
        this.pullMessageService = new PullMessageService(this);
        // 创建 rebalanceService  服务
        this.rebalanceService = new RebalanceService(this);
        // 创建 DefaultMQProducer 服务
        this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
        this.defaultMQProducer.resetClientConfig(clientConfig);
        // 开启 Comsumer 统计服务
        this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
     
        log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
            this.instanceIndex,
            this.clientId,
            this.clientConfig,
            MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
    }

主要功能:

    创建 MQAdminImpl 对象进行和 NameServer 进行交互,比如创建Topic、获取 Queue等
    创建 pullMessageService 服务
    创建 rebalanceService 服务,供 Consumer 端使用
    创建 DefaultMQProducer 服务,
    开启 Comsumer 统计服务。统计最近一段时间内,消费成功个数、消费失败个数等信息。




欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0