Board logo

标题: RocketMQ 生产者 Producer 启动过程(3) [打印本页]

作者: look_w    时间: 2019-4-12 13:48     标题: RocketMQ 生产者 Producer 启动过程(3)

启动Producer的时候判断 serviceState 的当前状态,只有 serviceState 状态为 CREATE_JUST 时,才启动 Producer。否则抛出异常信息。

2、同时防止启动多个 Producer,先把 serviceState 状态修改为 START_FAILED。

3、 检查 groupName 是否合法。比如不能为空,是否符合正则 ^[%|a-zA-Z0-9_-]+$,并且最大长度不能超过 255(CHARACTER_MAX_LENGTH = 255);
groupName 也不能等于 DEFAULT_PRODUCER。只要满足上面条件,则抛异常信息。

4、如果 producerGroup 不等于 CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER" ,然后调用 changeInstanceNameToPID() 方法判断名字不是 "DEFAULT" 则更改 instanceName。

    public void changeInstanceNameToPID() {
        if (this.instanceName.equals("DEFAULT")) {
            this.instanceName = String.valueOf(UtilAll.getPid());
        }
    }
    public static int getPid() {
        RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String name = runtime.getName(); // format: "pid@hostname"
        try {
            return Integer.parseInt(name.substring(0, name.indexOf('@')));
        .....
    }

5、构建 MQClientInstance 对象。

    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
            String clientId = clientConfig.buildMQClientId();
            MQClientInstance instance = this.factoryTable.get(clientId);
            if (null == instance) {
                instance =
                    new MQClientInstance(clientConfig.cloneClientConfig(),
                        this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
                MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
                if (prev != null) {
                    instance = prev;
                    log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
                } else {
                    log.info("Created new MQClientInstance for clientId:[{}]", clientId);
                }
            }
     
            return instance;
        }

    5.1 首先生成 clientId:ip@instanceName 或 ip@instanceName@unitName
    5.2 如果 factoryTable 中是不已经存在 MQClientInstance 实例,则创建。 (下面有单独分析该源码)

6、将 DefaultMQProducerImpl 对象注册到 ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();中

    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
     
    public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
        if (null == group || null == producer) {
            return false;
        }
        MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
        if (prev != null) {
            log.warn("the producer group[{}] exist already.", group);
            return false;
        }
        return true;
    }

7、以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中

8、调用 第五步创建的 MQClientInstance 实例 的start() 方法。
该方法做了很多事情:

    获取NameServer地址
    启动 Netty 客户端服务
    设置多个定时任务
    开启 pullMessageService 服务
    开启 rebalanceService 服务
    开启 发送消息服务

下面有具体代码分析MQClientInstance.start() 方法。

9、设置DefaultMQProducerImpl的ServiceState为RUNNING
10、向所有的 broker 发送心跳和上传 FilterClass




欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0