首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Kafka实战解惑(3)

Kafka实战解惑(3)

四、 Kafka Client API如前所述Kafka是一个消息队列,生产者发送消息到Kafka,消费者从Kafka中拉取消息,因此Kafka提供生产者、消费者两类API供程序开发使用。我们先来看一个生产者、消费者的简单例子,了解一下Kafka Client API的基本用法,而后在深入了解Kafka Client API的细节。
4.1 Producers APIView Code

Kafka 0.82版之后,提供新的API,对于生产者的API来讲,使用逻辑比较简单,推荐使用新API向Kafka发送消息。向Kafka发送消息时首先需要构建一个KafkaProducer对象,并设置发送消息的一些参数。Producer端的常用配置有:
[url=][/url]
bootstrap.servers:Kafka集群连接串,可以由多个host:port组成acks:broker消息确认的模式,有三种:0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认1:由Leader确认,Leader接收到消息后会立即返回确认信息all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息。我们可以根据消息的重要程度,设置不同的确认模式。默认为1retries:发送失败时Producer端的重试次数,默认为0batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节。linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,
以积累更多的消息打包发送,达到节省网络资源的目的。默认为0。key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定。buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,
那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)。
[url=][/url]


相比起Producers API的便宜使用,Consumer API的使用要复杂很多,核心问题就是如何高可靠的处理消息,保证消息不丢失。Kafka为了保证消息不丢失能被消费者成功的处理,在消费者处理消息成功后需要向Kafka发送确认确认消息被成功的消费。
View Code

上面的代码很容易看懂,但props.put("auto.commit.interval.ms", "1000")需要特殊说明一下。
4.3 消息高可靠 At-Least-Once网上各种文章经常谈到Kafka丢消息问题,那么Kakfa真的不可靠,只能用在允许有一定错误的系统中吗?这个问题还得从Kaka的设计初衷来看。
Kafka最初是被LinkedIn设计用来处理log的分布式消息系统,因此它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说Kafka并不能完全保证数据不丢失。尽管Kafka官网声称能够保证at-least-once,但如果consumer进程数小于partition_num,这个结论不一定成立。考虑这样一个case,partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。如果auto.commit.enable=false,假设consumer的两个fetcher各自拿了一条数据,并且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里需要着重说明的是,当手动执行commit的时候,实际上是对这个consumer进程所占有的所有partition进行commit,Kafka暂时还没有提供更细粒度的commit方式,也就是说,即使t2没有处理完partition2的数据,offset也被t1提交掉了。如果这时consumer crash掉,t2正在处理的这条数据就丢失了。如果希望能够严格的不丢数据,解决办法有两个:
  • 手动commit offset,并针对partition_num启同样数目的consumer进程,这样就能保证一个consumer进程占有一个partition,commit offset的时候不会影响别的partition的offset。但这个方法比较局限,因为partition和consumer进程的数目必须严格对应。
  • 另一个方法同样需要手动commit offset,另外在consumer端再将所有fetch到的数据缓存到queue里,当把queue里所有的数据处理完之后,再批量提交offset,这样就能保证只有处理完的数据才被commit。当然这只是基本思路,实际上操作起来不是这么简单,具体做法以后我再另开一篇。
4.4 消息高可靠ConsumerView Code

上面例子中我们将自动提交改为手动提交,如果取得消息后,因为某种原因没有进行提交,那么消息仍然保持在Kafka中,可以重复拉取之前没有确认的消息,保证消息不会丢失,但有可能重复处理相同的消息,消费者接收到重复消息后应该通过业务逻辑保证重复消息不会带来额外影响,这就是Kafka所说的At-Least-Once。上面的这种读取消息的方法是单线程的,除此之外还可以用多线程方法读取消息,每个线程从指定的分区中读取消息。View Code

我们还可以进一步让消费者消费某个分区的消息。
[url=][/url]
    public static void main(String[] args) {        Properties props = new Properties();        //设置brokerServer(Kafka)ip地址        props.put("bootstrap.servers", "172.16.49.173:9092");        //设置consumer group name        props.put("group.id", "manual_g4");        //设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率        props.put("enable.auto.commit", "true");        //偏移量(offset)提交频率        props.put("auto.commit.interval.ms", "1000");        //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset        //如果采用latest,消费者只能得道其启动后,生产者生产的消息        props.put("auto.offset.reset", "earliest");        //        props.put("session.timeout.ms", "30000");        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");        TopicPartition partition0 = new TopicPartition("producer_test", 0);        TopicPartition partition1 = new TopicPartition("producer_test", 1);        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);        consumer.assign(Arrays.asList(partition0, partition1));        while (true) {              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);              for (ConsumerRecord<String, String> record : records)                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());              try {                Thread.sleep(1000);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }}[url=][/url]

4.5 生产者、消费者总结
  • 如果consumer比partition多,是浪费,因为Kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数。
  • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目。
  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,Kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同。
  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
  • High-level接口中获取不到数据的时候是会block的。
返回列表