标题:
RocketMQ原理解析-Broker(2)
[打印本页]
作者:
look_w
时间:
2018-12-18 20:35
标题:
RocketMQ原理解析-Broker(2)
broker 2. broker的消息存储Rocketmq的消息的存储是由consume queue和 commitLog 配合完成的
1
) consume queue 消息的逻辑队列,相当于字典的目录用来指定消息在消息的真正的物理文件commitLog上的位置,每个topic下的每个queue都有一个对应的consumequeue文件。文件地址:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}consume queue中存储单元是一个20字节定长的数据,是顺序写顺序读(
1
) commitLogOffset是指这条消息在commitLog文件实际偏移量(
2
) size就是指消息大小(
3) 消息tag的哈希值
ConsumeQueue文件组织:
(1) topic queueId来组织的,比如TopicA配了读写队列0, 1,那么TopicA和Queue=0组成一个ConsumeQueue,TopicA和Queue=
1组成一个另一个ConsumeQueue.(
2
) 按消费端group分组重试队列,如果消费端消费失败,发送到retry消费队列中(
3
) 按消费端group分组死信队列,如果消费端重试超过指定次数,发送死信队列(
4) 每个ConsumeQueue可以由多个文件组成无限队列被MapedFileQueue对象管理
2) CommitLog消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分
文件地址:${user.home} \store\${commitlog}\${fileName}一个消息存储单元长度是不定的,顺序写但是随机读消息存储结构:
= 4 //
4个字节代表这个消息的大小
+ 4 //四个字节的MAGICCODE =
daa320a7
+ 4 //
消息体BODY CRC 当broker重启recover时会校验
+ 4 //
queueId 你懂得
+ 4 //
flag 这个标志值rocketmq不做处理,只存储后透传
+ 8 //QUEUEOFFSET这个值是个自增值不是真正的consume queue的偏移量,可以代表这个队列中消息的个数,要通过这个值查找到consume queue中数据,QUEUEOFFSET *
20才是偏移地址
+ 8 //
PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量
+ 4 //
SYSFLAG消息标志,指明消息是事物事物状态等等消息特征
+ 8 //
BORNTIMESTAMP 消息产生端(producer)的时间戳
+ 8 //
BORNHOST 消息产生端(producer)地址(address:port)
+ 8 //
STORETIMESTAMP 消息在broker存储时间
+ 8 //
STOREHOSTADDRESS 消息存储到broker的地址(address:port)
+ 8 //RECONSUMETIMES消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=
0的队列中去了
+ 8 //
Prepared Transaction Offset 表示是prepared状态的事物消息
+ 4 + bodyLength //
前4个字节存放消息体大小值, 后bodylength大小空间存储了消息体内容
+ 1 + topicLength //
一个字节存放topic名称能容大小, 后存放了topic的内容
+ 2 + propertiesLength //
2个字节(short)存放属性值大小, 后存放propertiesLength大小的属性数据
3) MapedFile 是PageCache文件封装,操作物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os系统的页大小为4k, 消息刷盘根据参数(commitLog默认至少刷4页, consumeQueue默认至少刷2页)才刷
以下io对象构建了物理文件映射内存的对象FileChannel fileChannel
=
new RandomAccessFile(file,“rw”).getChannel();MappedByteBuffer mappedByteBuffer
=
fileChannel.map(READE_WRITE,0,fileSize); 构建mapedFile对象需要两个参数fileSize: 映射的物理文件的大小 commitLog每个文件的大小默认1G
=1024*1024*1024
ConsumeQueue每个文件默认存30W条
= 300000 *
CQStoreUnitSize(每条大小)filename: filename文件名称但不仅仅是名称还表示文件记录的初始偏移量, 文件名其实是个long类型的值
4) MapedFileQueue 存储队列,数据定时删除,无限增长。
队列有多个文件(MapedFile)组成,由集合对象List表示升序排列,前面讲到文件名即是消息在此文件的中初始偏移量,排好序后组成了一个连续的消息队
当消息到达broker时,需要获取最新的MapedFile写入数据,调用MapedFileQueue的getLastMapedFile获取,此函数如果集合中一个也没有创建一个,如果最后一个写满了也创建一个新的。 MapedFileQueue在获取getLastMapedFile时,如果需要创建新的MapedFile会计算出下一个MapedFile文件地址,通过预分配服务AllocateMapedFileService异步预创建下一个MapedFile文件,这样下次创建新文件请求就不要等待,因为创建文件特别是一个1G的文件还是有点耗时的, getMinOffset获取队列消息最少偏移量,即第一个文件的文件起始偏移量 getMaxOffset获取队列目前写到位置偏移量 getCommitWhere刷盘刷到哪里了
5) DefaultMessageStore 消息存储层实现
(1
)putMessage 添加消息委托给commitLog.putMessage(msg),主要流程:
<1>
从mapedFileQueue获取最新的映射文件
<2>
向mapedFile中添加一条消息记录
<3>
构建DispatchRequest对象,添加到分发索引服务DispatchMessageService线程中去
<4>
唤醒异步刷盘线程
<5>
向发送方返回结果 (
2
)DispatchMessageService
<1>
分发消息位置到ConsumeQueue
<2>分发到IndexService建立索引
欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/)
Powered by Discuz! 7.0.0