首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Kafka实战解惑(4)

Kafka实战解惑(4)

五、 Kafka运维5.1 Broker故障切换我们在192.168.104.101、192.168.104.102两台服务器上启动Kafka组成一个集群,现在我们观察一下topic t1的情况。
bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1Topic:t1    PartitionCount:2    ReplicationFactor:2    Configs:Topic: t1    Partition: 0    Leader: 0    Replicas: 0,1    Isr: 0,1Topic: t1    Partition: 1    Leader: 1    Replicas: 1,0    Isr: 0,1我们看到t1由两个分区组成,分布于Leader 0、1两个服务器上。下面我们运行消费者程序,同时运行生产者程序,我们向topic t1发送111、222、333、444、555、666的数据。
bash-4.1# ./Kafka-console-producer.sh --broker-list 192.168.104.101:9092, 192.168.104.102:9092 --topic t1111222333444555666接下来我们观察一下消费者接收消息的情况。
fetched from partition 0, offset: 3, message: fetched from partition 1, offset: 4, message: 111fetched from partition 0, offset: 4, message: 222fetched from partition 1, offset: 5, message: 333fetched from partition 0, offset: 5, message: 444fetched from partition 1, offset: 6, message: 555fetched from partition 0, offset: 6, message: 666可以看到消息被非常均匀的发送到两个分区,消费者从两个分区中拉取了消息。为了模拟故障,我们手工kill 101上的Kafka进程。这时我们在观察Kafka的分区情况,对照之前的结果,我们发现两个分区的Leader都变为1了,说明Kafka启用了副本机制进行故障切换。
./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1Topic:t1    PartitionCount:2    ReplicationFactor:2    Configs:    Topic: t1    Partition: 0    Leader: 1    Replicas: 0,1    Isr: 1    Topic: t1    Partition: 1    Leader: 1    Replicas: 1,0    Isr: 1我们继续向分区发送888, 999,消费者仍然能够接收到发送的消息,而不受故障进度的影响。逻辑上看消费者只是读取分区上的消息,与具体的服务器没关系。
fetched from partition 1, offset: 7, message: 999fetched from partition 0, offset: 7, message: 8885.2 Broker动态扩容5.2.1 增加分区我们为已经创建的包含两个分区的Topic在添加一个分区。
Kafka-topics.sh --zookeeper 192.168.104.101:2181 --alter --topic t1 --partitions 3我们观察一下增加分区后的结果:
bash-4.1# ./Kafka-topics.sh --describe --zookeeper 192.168.104.101:2181 --topic t1Topic:t1    PartitionCount:3    ReplicationFactor:2    Configs:Topic: t1    Partition: 0    Leader: 0    Replicas: 0,1    Isr: 1,0Topic: t1    Partition: 1    Leader: 1    Replicas: 1,0    Isr: 1,0Topic: t1    Partition: 2    Leader: 0    Replicas: 0,1    Isr: 0,1接下来,我们使用生产者程序发送数据,过了一段时间后发现生产者程序已经可以向新增分区写入数据了。说明分区的增减对正在运行的应用程序(生产者、消费者)没有影响, 生产者、消费者都不需要重新启动。
5.2.2 增加Broker Server待补充
5.3 Kafka配置优化待补充
5.4 数据清理5.4.1 数据删除消息被kafka存储后,针对过期消息,可以通过设置策略(log.cleanup.policy=delete)进行删除。除了在kafka中做默认设置外,也可以再 topic创建时指定参数,这样将会覆盖kafka的默认设置,触发删除动作有两种条件:
  • 清理超过指定时间消息,通过log.retention.hours设置过期时间。
  • 清理大小超过预定设置消息,通过log.retention.bytes进行设置。
5.4.2 数据压缩kafka还可以进行数据压缩,设置log.cleanup.policy=compact只保留每个key最后一个版本的数据。
5.5 Kafka运行监控目前Kafka有三个常用的监控系统: Kafka Web Conslole、Kafka Manager、KafkaOffsetMonitor,这三个系统或多或少都有些问题,不是特别完善,推荐使用KafkaOffsetMonitor。
返回列表