首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

消息队列设计精要(7)

消息队列设计精要(7)

批量谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:
  • 攒够了一定数量。
  • 到达了一定时间。
  • 队列里有新的数据到来。
对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:
Executor executor = Executors.newFixedThreadPool(4);final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的   public void run(){      List<Message> messages  = new ArrayList<>(20);      queue.drainTo(messages,20);      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息   }});public void send(Message message){    queue.offer(message);    executor.submit(task)}这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。
Executor executor = Executors.newFixedThreadPool(4);final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();volatile long last = System.currentMills();Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){   flush();},500,500,TimeUnits.MILLS);private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。   public void run(){      List<Message> messages  = new ArrayList<>(20);      queue.drainTo(messages,20);      doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。   }});public void send(Message message){    last = System.currentMills();    queue.offer(message);    flush();}private void flush(){ if(queue.size>200||System.currentMills()-last>200){       executor.submit(task)  }}相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。
具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。
最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:
  • 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
  • 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。
返回列表