批量谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该何时进行消费。大处着眼来看,消费动作都是事件驱动的。主要事件包括:
- 攒够了一定数量。
- 到达了一定时间。
- 队列里有新的数据到来。
对于及时性要求高的数据,可用采用方式3来完成,比如客户端向服务端投递数据。只要队列有数据,就把队列中的所有数据刷出,否则将自己挂起,等待新数据的到来。
在第一次把队列数据往外刷的过程中,又积攒了一部分数据,第二次又可以形成一个批量。伪代码如下:
Executor executor = Executors.newFixedThreadPool(4);final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,故做成全局的 public void run(){ List<Message> messages = new ArrayList<>(20); queue.drainTo(messages,20); doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会囤新的消息 }});public void send(Message message){ queue.offer(message); executor.submit(task)}这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能造成的问题是发送过快的话批量的大小不够满足性能的极致。
Executor executor = Executors.newFixedThreadPool(4);final BlockingQueue<Message> queue = new ArrayBlockingQueue<>();volatile long last = System.currentMills();Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){ flush();},500,500,TimeUnits.MILLS);private Runnable task = new Runnable({//这里由于共享队列,Runnable可以复用,顾做成全局的。 public void run(){ List<Message> messages = new ArrayList<>(20); queue.drainTo(messages,20); doSend(messages);//阻塞,在这个过程中会有新的消息到来,如果4个线程都占满,队列就有机会屯新的消息。 }});public void send(Message message){ last = System.currentMills(); queue.offer(message); flush();}private void flush(){ if(queue.size>200||System.currentMills()-last>200){ executor.submit(task) }}相反对于可以用适量的延迟来换取高性能的场景来说,用定时/定量二选一的方式可能会更为理想,既到达一定数量才发送,但如果数量一直达不到,也不能干等,有一个时间上限。
具体说来,在上文的submit之前,多判断一个时间和数量,并且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就非常方便。
最后啰嗦几句,曾经有人问我,为什么网络请求小包合并成大包会提高性能?主要原因有两个:
- 减少无谓的请求头,如果你每个请求只有几字节,而头却有几十字节,无疑效率非常低下。
- 减少回复的ack包个数。把请求合并后,ack包数量必然减少,确认和重发的成本就会降低。
|