怎么保证可靠性的?RabbitMQ提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。
- 持久化
当RabbitMQ退出时,默认会将消息和队列都清除,所以需要在第一次声明队列和发送消息时指定其持久化属性为true,这样RabbitMQ会将队列、消息和状态存到RabbitMQ本地的数据库,重启后会恢复。
java: durable=true channel.queueDeclare("task_queue", durable, false, false, null); // 队列 channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 消息注:当声明的队列已经存在时,尝试重新定义它的durable是不生效的。
- 接收应答
客户端接收消息的模式默认是自动应答,但是通过设置autoAck为false可以让客户端主动应答消息。当客户端拒绝此消息或者未应答便断开连接时,就会使得此消息重新入队(在版本2.7.0以前是到重新加入到队尾,2.7.0及以后是保留消息在队列中的原来位置)。
java: autoAck = false; requeue = true; channel.basicConsume(queue, autoAck, callback); channel.basicAck();//应答 channel.basicReject(deliveryTag, requeue); // 拒绝 channel.basicRecover(requeue); // 恢复- 发送确认
默认情况下,发送端不关注发出去的消息是否被消费掉了。可设置channel为confirm模式,所有发送的消息都会被确认一次,用户可以自行根据server发回的确认消息查看状态。详细介绍见:
java: channel.confirmSelect(); // 进入confirm模式 // do publish messages... 每条消息都会被编号,从1开始 channel.getNextPublishSeqNo() // 查看下一条要发送的消息的序号 channel.waitForConfirms(); // 等待所有消息发送并确认- 事务:和confirm模式不能同时使用,而且会带来大量的多余开销,导致吞吐量下降很多,故而不推荐。
java: channel.txSelect(); try { // do something... channel.txCommit(); } catch (e){ channel.txRollback(); }- 消息队列的高可用(主备模式)
相比于路由和绑定,可以视为是共享于所有的节点的,消息队列默认只存在于第一次声明它的节点上,这样一旦这个节点挂了,这个队列中未处理的消息就没有了。
幸好,RabbitMQ提供了将它备份到其他节点的机制,任何时候都有一个master负责处理请求,其他slaves负责备份,当master挂掉,会将最早创建的那个slave提升为master。
命令:
rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}':设置所有以'ha'开头的queue在所有节点上拥有备份。 ;
也可以在界面上配置。 注:由于exclusive类型的队列会在client和server连接断开时被删掉,所以对它设置持久化属性和备份都是没有意义的。
- 顺序保证
直接上图好了:
seq
|