首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

RocketMQ原理解析-Broker(3)

RocketMQ原理解析-Broker(3)

broker 3. load&recoverBroker启动的时候需要加载一系列的配置,启动一系列的任务,主要分布在BrokerController 的initialize()和start()方法中


1.      加载topic配置2.      加载消费进度consumer offset3.      加载消费者订阅关系consumer subscription4.      加载本地消息messageStore.load()a)        Load 定时进度b)        Load commit logcommitLog其实调用存储消费队列mapedFileQueue.load()方法来加载的。遍历出${user.home} \store\${commitlog}目录下所有commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每个文件构建一个MapedFile对象, 在MapedFileQueue中用集合list把这些MapedFile文件组成一个逻辑上连续的队列c)        Load consume Queue遍历${user.home} \store\consumequeue下的所有文件夹(每个topic就是一个文件夹)遍历${user.home} \store\consumequeue\${topic}下的所有文件夹(每个queueId就是一个文件夹)遍历${user.home} \store\consumequeue\${topic}\${queueId}下所有文件,根据topic, queueId, 文件来构建ConsueQueue对象DefaultMessageStore中存储结构Map<topic,Map<queueId, CosnueQueue>>每个Consumequeue利用MapedFileQueue把mapedFile组成一个逻辑上连续的队列d)        加载事物模块e)        加载存储检查点加载${user.home} \store\checkpoint 这个文件存储了3个long类型的值来记录存储模型最终一致的时间点,这个3个long的值为physicMsgTimestamp为commitLog最后刷盘的时间logicMsgTimestamp为consumeQueue最终刷盘的时间indexMsgTimestamp为索引最终刷盘时间checkpoint作用是当异常恢复时需要根据checkpoint点来恢复消息f)         加载索引服务indexServiceg)        recover尝试数据恢复判断是否是正常恢复,系统启动的启动存储服务(DefaultMessageStore)的时候会创建一个临时文件abort, 当系统正常关闭的时候会把这个文件删掉,这个类似在Linux下打开vi编辑器生成那个临时文件,所有当这个abort文件存在,系统认为是异常恢复 [url=][/url]


1)  先按照正常流程恢复ConsumeQueue为什么说先正常恢复,那么异常恢复在哪呢?当broker是异常启动时候,在异常恢复commitLog时会重新构建请到DispatchMessageService服务,来重新生成ConsumeQueue数据,
索引以及事物消息的redolog 什么是恢复ConsumeQueue, 前面不是有步骤load了ConsumeQueue吗,为什么还要恢复?前面load步骤创建了MapedFile对象建立了文件的内存映射,但是数据是否正确,现在文件写到哪了(wrotePosition),
Flush到了什么位置(committedPosition)?恢复数据来帮我解决这些问题。 每个ConsumeQueue的mapedFiles集合中,从倒数第三个文件开始恢复(为什么只恢复倒数三个文件,也许只是个经验值吧),
因为consumequeue的存储单元是20字节的定长数据,所以是依次分别取了Offset long类型存储了commitLog的数据偏移量Size int类型存储了在commitLog上消息大小tagcode tag的哈希值目前rocketmq判断存储的consumequeue数据是否有效的方式为判断offset
>= 0 && size > 0                            如果数据有效读取下20个字节判断是否有效                            如果数据无效跳出循环,记录此时有效数据的偏移量processOffset                            如果读到文件尾,读取下一个文件                                              proccessOffset是有效数据的偏移量,获取这个值的作用什么?(1)    proccessOffset后面的数据属于脏数据,后面的文件要删除掉(2)    设置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值为 proccessOffset%mapedFileSize2)  正常恢复commitLog文件步骤跟流程恢复Consume Queue判断消息有效, 根据消息的存储格式读取消息到DispatchRequest对象,获取消息大小值msgSize           大于 0  正常数据           等于-1      文件读取错误 恢复结束           等于0  读到文件末尾3)  异常数据恢复,OSCRASH或者JVM CRASH或者机器掉电当${user.home}\store\abort文件存在,代表异常恢复读取${user.home} \store\checkpoint获取最终一致的时间点判断最终一致的点所在的文件是哪个从最新的mapedFile开始,获取存储的一条消息在broker的生成时间,大于checkpoint时间点的放弃找前一个文件,小于等于checkpoint时间点的说明checkpoint
在此mapedfile文件中从checkpoint所在mapedFile开始恢复数据,它的整体过程跟正常恢复commitlog类似,最重要的区别在于(
1)读取消息后派送到分发消息服务DispatchMessageService中,来重建ConsumeQueue以及索引(2)根据恢复的物理offset,清除ConsumeQueue多余的数据4)  恢复TopicQueueTable=Map<topic-queueid,offset>1)          恢复写入消息时,消费记录队列的offset(2)          恢复每个队列的最小offset5.      初始化通信层6.      初始化线程池7.      注册broker端处理器用来接收client请求后选择处理器处理8.      启动每天凌晨00:00:00统计消费量任务9.      启动定时刷消费进度任务10.  启动扫描数据被删除了的topic,offset记录也对应删除任务11.  如果namesrv地址不是指定的,而是从静态服务器取的,启动定时向静态服务器获取namesrv地址的任务12.  如果broker是master,启动任务打印slave落后master没有同步的bytes如果broker是slave,启动任务定时到mastser同步配置信息
返回列表