分布式开放消息系统(RocketMQ)的原理与实践-1
- UID
- 1066743
|
分布式开放消息系统(RocketMQ)的原理与实践-1
分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:
RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?
关键特性以及其实现原理一、顺序消息消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。
假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:
你可能会采用这种方式保证消息顺序
M1发送到S1后,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端后,通知S2,然后S2再将M2发送到消费端。
这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就需要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M1、M2发送到同一个Server上:
保证消息顺序,你改进后的方法
这样可以保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。
这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问题:
网络延迟问题
只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和M2发往同一个消费者即可,且发送M1后,需要消费端响应成功后才能发送M2。
但又会引入另外一个问题,如果发送M1后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。
保证消息顺序的正确姿势
这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达,另外一种情况是消费端1已经响应,但是Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就是我们后面要说的第二个问题,消息重复问题。
回过头来看消息顺序问题,严格的顺序消息非常容易理解,而且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:
保证生产者 - MQServer - 消费者是一对一对一的关系
但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。
但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?
世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!
有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:
1、不关注乱序的应用实际大量存在
2、队列无序并不意味着消息无序
最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。
一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:
[url=][/url]
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/HashSendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);[url=][/url]
在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。
[url=][/url]
private SendResult send() { // 获取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; // 根据我们的算法,选择一个发送队列 // 这里的arg = orderId mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); } }} |
|
|
|
|
|