首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

RabbitMQ在分布式系统中的应用(3)

RabbitMQ在分布式系统中的应用(3)

简介几个重要的概念
  • Virtual Host: 包含若干个Exchange和Queue,表示一个节点;
  • Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
  • Binding: 连接Exchange和Queue,包含路由规则。
  • Queue: 消息队列,存储还未被消费的消息。
  • Message: Header+Body
  • Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源。
ClientRabbitMQ官方实现了很多热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:
  • 建立连接:
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    可以加上断开重试机制:
    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(10000);
    创建连接和通道:
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
  • 一对一:一个生产者,一个消费者

1

生产者:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicPublish("", QUEUE_NAME, null, message.getBytes());消费者:
Consumer consumer = new DefaultConsumer(channel) {  @Override  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)      throws IOException {    String message = new String(body, "UTF-8");    System.out.println(" [x] Received '" + message + "'");  }};channel.basicConsume(QUEUE_NAME, autoAck, consumer);
  • 一对多:一个生产者,多个消费者

workqueue

代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。
如果设置了autoAck=false,那么可以实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。需要在消费者中加上:
int prefetchCount = 1;channel.basicQos(prefetchCount);其他同上。
  • 广播

broadcast

生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());消费者同上。
  • Routing: 指定路由规则

routing

生产者:
String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, routingKey);channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());消费者同上。
  • Topics: 支持通配符的Routing

topics

*可以表示一个单词#可以表示一个或多个单词生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);消费者同上。
  • RPC

rpc

其实就是一对一模式的一种用法:
首先,客户端发送一条消息到服务端声明的队列,消息属性中包含reply_to和correlation_id
- reply_to 是客户端创建的消息的队列,用来接收远程调用结果- correlation_id 是消息的标识,服务端回应的消息属性中会带上以便知道是哪条消息的结果。然后,服务端接收到消息,处理,并返回一条结果到reply_to队列中,
最终,客户端接收到返回消息,继续向下处理。
返回列表