首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

RocketMQ原理解析-Broker(2)

RocketMQ原理解析-Broker(2)

broker 2. broker的消息存储Rocketmq的消息的存储是由consume queue和 commitLog 配合完成的
1)  consume queue 消息的逻辑队列,相当于字典的目录用来指定消息在消息的真正的物理文件commitLog上的位置,每个topic下的每个queue都有一个对应的consumequeue文件。文件地址:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}consume queue中存储单元是一个20字节定长的数据,是顺序写顺序读(1)      commitLogOffset是指这条消息在commitLog文件实际偏移量(2)      size就是指消息大小(3)      消息tag的哈希值


ConsumeQueue文件组织:
(1)    topic queueId来组织的,比如TopicA配了读写队列0, 1,那么TopicA和Queue=0组成一个ConsumeQueue,TopicA和Queue=1组成一个另一个ConsumeQueue.(2)    按消费端group分组重试队列,如果消费端消费失败,发送到retry消费队列中(3)    按消费端group分组死信队列,如果消费端重试超过指定次数,发送死信队列(4)    每个ConsumeQueue可以由多个文件组成无限队列被MapedFileQueue对象管理


2)  CommitLog消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分
文件地址:${user.home} \store\${commitlog}\${fileName}一个消息存储单元长度是不定的,顺序写但是随机读消息存储结构:=       4  //4个字节代表这个消息的大小                   +       4 //四个字节的MAGICCODE = daa320a7                   +       4  //消息体BODY CRC  当broker重启recover时会校验                   +       4 //queueId  你懂得                   +       4  //flag  这个标志值rocketmq不做处理,只存储后透传                   +     8  //QUEUEOFFSET这个值是个自增值不是真正的consume queue的偏移量,可以代表这个队列中消息的个数,要通过这个值查找到consume queue中数据,QUEUEOFFSET * 20才是偏移地址                   +       8       //PHYSICALOFFSET 代表消息在commitLog中的物理起始地址偏移量                   +     4       //SYSFLAG消息标志,指明消息是事物事物状态等等消息特征                   +     8       //BORNTIMESTAMP 消息产生端(producer)的时间戳                   +     8      //BORNHOST          消息产生端(producer)地址(address:port)                   +       8       //STORETIMESTAMP 消息在broker存储时间                   +       8       //STOREHOSTADDRESS 消息存储到broker的地址(address:port)                   +       8      //RECONSUMETIMES消息被某个订阅组重新消费了几次(订阅组之间独立计数),因为重试消息发送到了topic名字为%retry%groupName的队列queueId=0的队列中去了                   +       8       //Prepared Transaction Offset 表示是prepared状态的事物消息                   +       4  +  bodyLength // 前4个字节存放消息体大小值,  后bodylength大小空间存储了消息体内容                   +       1 +  topicLength  //一个字节存放topic名称能容大小, 后存放了topic的内容         +       2 + propertiesLength    // 2个字节(short)存放属性值大小, 后存放propertiesLength大小的属性数据  
3)  MapedFile 是PageCache文件封装,操作物理文件在内存中的映射以及将内存数据持久化到物理文件中,代码中写死了要求os系统的页大小为4k, 消息刷盘根据参数(commitLog默认至少刷4页, consumeQueue默认至少刷2页)才刷  
以下io对象构建了物理文件映射内存的对象FileChannel fileChannel = new RandomAccessFile(file,“rw”).getChannel();MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize);          构建mapedFile对象需要两个参数fileSize: 映射的物理文件的大小                   commitLog每个文件的大小默认1G =1024*1024*1024                   ConsumeQueue每个文件默认存30W条 = 300000 *CQStoreUnitSize(每条大小)filename: filename文件名称但不仅仅是名称还表示文件记录的初始偏移量, 文件名其实是个long类型的值  

4)  MapedFileQueue 存储队列,数据定时删除,无限增长。
队列有多个文件(MapedFile)组成,由集合对象List表示升序排列,前面讲到文件名即是消息在此文件的中初始偏移量,排好序后组成了一个连续的消息队

当消息到达broker时,需要获取最新的MapedFile写入数据,调用MapedFileQueue的getLastMapedFile获取,此函数如果集合中一个也没有创建一个,如果最后一个写满了也创建一个新的。         MapedFileQueue在获取getLastMapedFile时,如果需要创建新的MapedFile会计算出下一个MapedFile文件地址,通过预分配服务AllocateMapedFileService异步预创建下一个MapedFile文件,这样下次创建新文件请求就不要等待,因为创建文件特别是一个1G的文件还是有点耗时的,         getMinOffset获取队列消息最少偏移量,即第一个文件的文件起始偏移量         getMaxOffset获取队列目前写到位置偏移量         getCommitWhere刷盘刷到哪里了  

5) DefaultMessageStore 消息存储层实现
      (1)putMessage 添加消息委托给commitLog.putMessage(msg),主要流程:<1> 从mapedFileQueue获取最新的映射文件<2>向mapedFile中添加一条消息记录<3> 构建DispatchRequest对象,添加到分发索引服务DispatchMessageService线程中去<4>唤醒异步刷盘线程<5> 向发送方返回结果          (2)DispatchMessageService                   <1>分发消息位置到ConsumeQueue                   <2>分发到IndexService建立索引
返回列表