RocketMQ 生产者 Producer 启动过程(6)
- UID
- 1066743
|
RocketMQ 生产者 Producer 启动过程(6)
startScheduledTask() 方法:定时任务
private void startScheduledTask() {
// 1.如果 NameServer 地址默认没配置,则定时向一个Http地址获取
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 2. 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
// 3. 定时清理无效的Broker,并向所有的Broker 发送心跳数据
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 4. 定时的持久化 Consumer 端消费每个 queue的 offset 数据。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 5. 调整消费端的线程数
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
1、定时更新 NameServer 地址
每个2分钟,程序会向一个Http地址发送请求来获取NameServer地址来动态更新NameServer地址。
2、 定时的从 NameServer 中获取 Topic、broker、queue 相关信息
默认每隔 30秒去 NameServer 中获取Topic、broker、queue等相关信息。
如果有新broker注册或下线,producer端会在30秒之内感知。
3、定时清理无效的Broker,并向所有的Broker 发送心跳数据.
默认每隔 30 秒向 Broker 发送心跳数据 和 用户自定义的 filterclass 类。
4、定时的持久化 Consumer 端消费每个 queue的 offset 数据。
默认每隔 5 秒持久或 Consumer 消费的 queue 的 offset信息。
持久化分为,远程持久化和本地持久化。
MessageModel.CLUSTERING 模式 queue的offset 保存到 broker上。
BROADCASTING("BROADCASTING") 模式 queue 的 offset 保存在本地。
5、调整消费端的线程数
每隔 1 分钟计算每一个queue中消息挤压的数量,如果超过100000条,则增加消费线程的并发数,如果小于80000条则减少消费者的线程数。
不过进入源码中看,调整消费者的线程数都注释掉了。
|
|
|
|
|
|