分布式开放消息系统(RocketMQ)的原理与实践-3
- UID
- 1066743
|
分布式开放消息系统(RocketMQ)的原理与实践-3
消费事务消息
这样基本上可以解决超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。
20160321补充:在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息,具体情况请参考rocketmq的issues:
四、Producer如何发送消息Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:
producer发送消息负载均衡
首先分析一下RocketMQ的客户端发送消息的源码:
[url=][/url]
// 构造ProducerDefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");// 初始化Producer,整个应用生命周期内,只需要初始化1次producer.start();// 构造MessageMessage msg = new Message("TopicTest1",// topic "TagA",// tag:给消息打标签,用于区分一类消息,可为null "OrderID188",// key:自定义Key,可以用于去重,可为null ("Hello MetaQ").getBytes());// body:消息内容// 发送消息并返回结果SendResult sendResult = producer.send(msg);// 清理资源,关闭网络连接,注销自己producer.shutdown();[url=][/url]
在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:
- 如果没有指定namesrv地址,将会自动寻址
- 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...
- 启动负载均衡的服务
初始化完成后,开始发送消息,发送消息的主要代码如下:
[url=][/url]
private SendResult sendDefaultImpl(Message msg,......) { // 检查Producer的状态是否是RUNNING this.makeSureStateOK(); // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长 Validators.checkMessage(msg, this.defaultMQProducer); // 获取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 从路由信息中选择一个消息队列 MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); // 将消息发送到该队列上去 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);}[url=][/url]
代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。
如果Producer发送消息失败,会自动重试,重试的策略:
- 重试次数 < retryTimesWhenSendFailed(可配置)
- 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
- 同时满足上面两个条件后,Producer会选择另外一个队列发送消息
|
|
|
|
|
|