Board logo

标题: Kafka实战解惑(3) [打印本页]

作者: look_w    时间: 2018-12-24 17:22     标题: Kafka实战解惑(3)

四、 Kafka Client API如前所述Kafka是一个消息队列,生产者发送消息到Kafka,消费者从Kafka中拉取消息,因此Kafka提供生产者、消费者两类API供程序开发使用。我们先来看一个生产者、消费者的简单例子,了解一下Kafka Client API的基本用法,而后在深入了解Kafka Client API的细节。
4.1 Producers APIView Code

Kafka 0.82版之后,提供新的API,对于生产者的API来讲,使用逻辑比较简单,推荐使用新API向Kafka发送消息。向Kafka发送消息时首先需要构建一个KafkaProducer对象,并设置发送消息的一些参数。Producer端的常用配置有:
[url=][/url]
bootstrap.servers:Kafka集群连接串,可以由多个host:port组成acks:broker消息确认的模式,有三种:0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认1:由Leader确认,Leader接收到消息后会立即返回确认信息all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息。我们可以根据消息的重要程度,设置不同的确认模式。默认为1retries:发送失败时Producer端的重试次数,默认为0batch.size:当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节。linger.ms:发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,
以积累更多的消息打包发送,达到节省网络资源的目的。默认为0。key.serializer/value.serializer:消息key/value的序列器Class,根据key和value的类型决定。buffer.memory:消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,
那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)。
[url=][/url]


相比起Producers API的便宜使用,Consumer API的使用要复杂很多,核心问题就是如何高可靠的处理消息,保证消息不丢失。Kafka为了保证消息不丢失能被消费者成功的处理,在消费者处理消息成功后需要向Kafka发送确认确认消息被成功的消费。
View Code

上面的代码很容易看懂,但props.put("auto.commit.interval.ms", "1000")需要特殊说明一下。
4.3 消息高可靠 At-Least-Once网上各种文章经常谈到Kafka丢消息问题,那么Kakfa真的不可靠,只能用在允许有一定错误的系统中吗?这个问题还得从Kaka的设计初衷来看。
Kafka最初是被LinkedIn设计用来处理log的分布式消息系统,因此它的着眼点不在数据的安全性(log偶尔丢几条无所谓),换句话说Kafka并不能完全保证数据不丢失。尽管Kafka官网声称能够保证at-least-once,但如果consumer进程数小于partition_num,这个结论不一定成立。考虑这样一个case,partiton_num=2,启动一个consumer进程订阅这个topic,对应的,stream_num设为2,也就是说启两个线程并行处理message。如果auto.commit.enable=true,当consumer fetch了一些数据但还没有完全处理掉的时候,刚好到commit interval出发了提交offset操作,接着consumer crash掉了。这时已经fetch的数据还没有处理完成但已经被commit掉,因此没有机会再次被处理,数据丢失。如果auto.commit.enable=false,假设consumer的两个fetcher各自拿了一条数据,并且由两个线程同时处理,这时线程t1处理完partition1的数据,手动提交offset,这里需要着重说明的是,当手动执行commit的时候,实际上是对这个consumer进程所占有的所有partition进行commit,Kafka暂时还没有提供更细粒度的commit方式,也就是说,即使t2没有处理完partition2的数据,offset也被t1提交掉了。如果这时consumer crash掉,t2正在处理的这条数据就丢失了。如果希望能够严格的不丢数据,解决办法有两个:
4.4 消息高可靠ConsumerView Code

上面例子中我们将自动提交改为手动提交,如果取得消息后,因为某种原因没有进行提交,那么消息仍然保持在Kafka中,可以重复拉取之前没有确认的消息,保证消息不会丢失,但有可能重复处理相同的消息,消费者接收到重复消息后应该通过业务逻辑保证重复消息不会带来额外影响,这就是Kafka所说的At-Least-Once。上面的这种读取消息的方法是单线程的,除此之外还可以用多线程方法读取消息,每个线程从指定的分区中读取消息。View Code

我们还可以进一步让消费者消费某个分区的消息。
[url=][/url]
    public static void main(String[] args) {        Properties props = new Properties();        //设置brokerServer(Kafka)ip地址        props.put("bootstrap.servers", "172.16.49.173:9092");        //设置consumer group name        props.put("group.id", "manual_g4");        //设置自动提交偏移量(offset),由auto.commit.interval.ms控制提交频率        props.put("enable.auto.commit", "true");        //偏移量(offset)提交频率        props.put("auto.commit.interval.ms", "1000");        //设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset        //如果采用latest,消费者只能得道其启动后,生产者生产的消息        props.put("auto.offset.reset", "earliest");        //        props.put("session.timeout.ms", "30000");        props.put("key.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");        props.put("value.deserializer", "org.apache.Kafka.common.serialization.StringDeserializer");        TopicPartition partition0 = new TopicPartition("producer_test", 0);        TopicPartition partition1 = new TopicPartition("producer_test", 1);        KafkaConsumer<String ,String> consumer = new KafkaConsumer<String ,String>(props);        consumer.assign(Arrays.asList(partition0, partition1));        while (true) {              ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);              for (ConsumerRecord<String, String> record : records)                  System.out.printf("offset = %d, key = %s, value = %s  \r\n", record.offset(), record.key(), record.value());              try {                Thread.sleep(1000);            } catch (InterruptedException e) {                // TODO Auto-generated catch block                e.printStackTrace();            }        }}[url=][/url]

4.5 生产者、消费者总结




欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0