简介几个重要的概念- Virtual Host: 包含若干个Exchange和Queue,表示一个节点;
- Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
- Binding: 连接Exchange和Queue,包含路由规则。
- Queue: 消息队列,存储还未被消费的消息。
- Message: Header+Body
- Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源。
ClientRabbitMQ官方实现了很多热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:
- 建立连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
可以加上断开重试机制:
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
创建连接和通道:
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
- 一对一:一个生产者,一个消费者
1
生产者:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicPublish("", QUEUE_NAME, null, message.getBytes());消费者:
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }};channel.basicConsume(QUEUE_NAME, autoAck, consumer);
workqueue
代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。
如果设置了autoAck=false,那么可以实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。需要在消费者中加上:
int prefetchCount = 1;channel.basicQos(prefetchCount);其他同上。
broadcast
生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());消费者同上。
routing
生产者:
String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, routingKey);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());消费者同上。
topics
*可以表示一个单词#可以表示一个或多个单词生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);消费者同上。
rpc
其实就是一对一模式的一种用法:
首先,客户端发送一条消息到服务端声明的队列,消息属性中包含reply_to和correlation_id
- reply_to 是客户端创建的消息的队列,用来接收远程调用结果- correlation_id 是消息的标识,服务端回应的消息属性中会带上以便知道是哪条消息的结果。然后,服务端接收到消息,处理,并返回一条结果到reply_to队列中,
最终,客户端接收到返回消息,继续向下处理。 |