二、 Kafka架构方案从物理结构上看,整个Kafka系统由消息生产者、消息消费者、消费存储服务器外加Zookeeper构成。其中消息生产者被称为Producer、消息消费者被称为Consumer、消息存储服务器被称为Broker。整个Kafka的架构方案非常简单,典型的无状态水平扩展架构,通过水平增加Broker实例实现系统的高吞吐率,而有状态的数据则存储到Zookeeper中。
Kafka采用Push-Pull模式,生产者发送消息时,可根据策略存储在Kafka集群的任意一台broker上,消费者通过定时轮询(非固定周期)的方式从Broker上取得消息。消息发送到哪一台服务器上,又从哪台服务器上获取消息,则是由逻辑结构解决的,或者说逻辑结构建立在物理结构基础上,对于生产者、消费者而言,只要了解逻辑结构就可以了。
从逻辑上讲,一个Kafka集群中包含若干个消息队列,每个消息队列都有自己的名称,在Kafka中消息队列的名称被称为Topic,为了实现系统的高吞吐率,每个消息队列被拆分成不同部分,即我们所说的分区(Partition),分区存储在不同的Broker中。生产者发送消息时可根据一定策略发送到不同的分区中,这类似于数据库的分库分表操作,同样消费者拉取消息时,也可以根据一定策略从某个分区中读取消息。就物理结构而言,每个分区就是broker上的一个文件,试想一下并发的对多个分布在不同broker上的文件进行读写,性能当然显著优于对单台broker上的文件进行读写,我们所说的Kfaka具有高吞吐率就是这个道理。
Kafak每个Topic的消息都存储在日志文件中,Kafka消息日志文件由一个索引文件和若干个具体的消息文件构成。每个消息文件都由起始消息编号构成,通过索引可以快速定位消息文件进行读写,由于消息是顺序写入文件中,所以读写效率非常高。在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍。现代的操作系统都对次做了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,写的时候将各种微小琐碎的逻辑写入组织合并成一次较大的物理写入,很多时候线性读写磁盘比随机读取内存都快。
与其他常见的消息队列不同,Kafka有一个叫做消费组的概念,多个消费者被逻辑上合并在一起叫做消费组。一个消息队列理论上可拥有无限个消费组,消费组是Kafka有别于其他消息队列的一个重要概念,同一个分区的消息只能被一个消费组内的某个消费者读取,但其他消费组内的消费者仍然可读取这个分区的消费。如下图所示整个Kafka消息队列由两个broker server构成,server1上包含两个分区p0、p3,server2上包含两个分区p1、p2。现在有两个消费组A、B,消费组A中包含两个消费者C1、C2,消费组B中包含4个消费者C3、C4、C5、C6。那么假定P0分区上有一条消息。Consumer Group A中的C1、C2其中之一会消费这条消息,Consumer Group B中的C3、C4、C5、C6其中之一也会消费这条消息,也就是说两个消费组A、B中的消费者都会同时消费这条消息,而组内只能有一个消费者消费这条消息。
我们所说的C1、C2只是一个逻辑上的划分就具体实现而言,C1、C2可以是一个进程内部的两个线程,也可以是两个独立的进程,对于C3、C4、C5、C6也是同样的道理。我们知道Kafka每个分区中的消息都是以顺序结构保存到文件中的,那么消费者每次从什么位置读取消息呢,奥秘就是每个消费者都保存offset到zookeeper中。
如前所述,Kafka是一个Push-Pull模式的消息队列,并且可以有多个生产者、多个消费者,那么这些生产者和消费者是如何协同工作的呢?首先我们来看生产者怎么确定把消费发送到哪个分区上。默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。
def partition(key: Any, numPartitions: Int): Int = { Utils.abs(key.hashCode) % numPartitions}
这就保证了相同key的消息一定会被路由到相同的分区。如果你没有指定key,那么Kafka是如何确定这条消息去往哪个分区的呢?我们来看下面的代码:
[url=][/url]
if(key == null) { // 如果没有指定key val id = sendPartitionPerTopicCache.get(topic) // 先看看Kafka有没有缓存的现成的分区Id id match { case Some(partitionId) => partitionId // 如果有的话直接使用这个分区Id就好了 case None => // 如果没有的话, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) //找出所有可用分区的leader所在的broker if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size // 从中随机挑一个 val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用 partitionId } }[url=][/url]
可以看出,Kafka几乎就是随机找一个分区发送无key的消息,然后把这个分区号加入到缓存中以备后面直接使用——当然了,Kafka本身也会清空该缓存(默认每10分钟或每次请求topic元数据时)
接下来我们来看消费者如何获取消息。对于消费者Kafka提供的两种分配策略: range和roundrobin,由参数 partition.assignment.strategy指定,默认是range策略。本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?
C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9
为了保证高可靠,Kafka每个分区都有一定数量的副本,当故障发生时通过zookeeper选择其一作为领导者,Kafka采用同步复制机制,写leader完成后在写副本。如果某个副本写失败,则将这个副本从当前分区一致集合中摘除,后期根据一定策略在进行异步补偿,将不一致状态变为一致状态。极端情况下如果所有副本写入均失败,变为不一致状态,如果在变成一致状态前leader崩溃,那么消息才可能真正丢失,但极端情况很难出现,一旦出现这种极端情况,任何系统都无能为力了,所以我们说Kafka还是非常可靠的。 |