Board logo

标题: 分布式开放消息系统(RocketMQ)的原理与实践-3 [打印本页]

作者: look_w    时间: 2018-12-18 20:17     标题: 分布式开放消息系统(RocketMQ)的原理与实践-3

消费事务消息
这样基本上可以解决超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。
20160321补充:在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息,具体情况请参考rocketmq的issues:

四、Producer如何发送消息Producer轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:

producer发送消息负载均衡


首先分析一下RocketMQ的客户端发送消息的源码:
[url=][/url]
// 构造ProducerDefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");// 初始化Producer,整个应用生命周期内,只需要初始化1次producer.start();// 构造MessageMessage msg = new Message("TopicTest1",// topic                        "TagA",// tag:给消息打标签,用于区分一类消息,可为null                        "OrderID188",// key:自定义Key,可以用于去重,可为null                        ("Hello MetaQ").getBytes());// body:消息内容// 发送消息并返回结果SendResult sendResult = producer.send(msg);// 清理资源,关闭网络连接,注销自己producer.shutdown();[url=][/url]

在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:
初始化完成后,开始发送消息,发送消息的主要代码如下:
[url=][/url]
private SendResult sendDefaultImpl(Message msg,......) {    // 检查Producer的状态是否是RUNNING    this.makeSureStateOK();    // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长    Validators.checkMessage(msg, this.defaultMQProducer);    // 获取topic路由信息    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    // 从路由信息中选择一个消息队列    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);    // 将消息发送到该队列上去    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);}[url=][/url]

代码中需要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv获取路由信息。selectOneMessageQueue方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。
如果Producer发送消息失败,会自动重试,重试的策略:





欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0