首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

分布式开放消息系统(RocketMQ)的原理与实践-5

分布式开放消息系统(RocketMQ)的原理与实践-5

3、消息存储实现消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。
[url=][/url]
// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate settingmsg.setBodyCRC(UtilAll.crc32(msg.getBody()));StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();synchronized (this) {    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();    // Here settings are stored timestamp, in order to ensure an orderly global    msg.setStoreTimestamp(beginLockTimestamp);    // MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();    // 将Message追加到文件commitlog    result = mapedFile.appendMessage(msg, this.appendMessageCallback);    switch (result.getStatus()) {    case PUT_OK:break;    case END_OF_FILE:         // Create a new file, re-write the message         mapedFile = this.mapedFileQueue.getLastMapedFile();         result = mapedFile.appendMessage(msg, this.appendMessageCallback);     break;     DispatchRequest dispatchRequest = new DispatchRequest(                topic,// 1                queueId,// 2                result.getWroteOffset(),// 3                result.getWroteBytes(),// 4                tagsCode,// 5                msg.getStoreTimestamp(),// 6                result.getLogicsOffset(),// 7                msg.getKeys(),// 8                /**                 * Transaction                 */                msg.getSysFlag(),// 9                msg.getPreparedTransactionOffset());// 10    // 1.分发消息位置到ConsumeQueue    // 2.分发到IndexService建立索引    this.defaultMessageStore.putDispatchRequest(dispatchRequest);}[url=][/url]

4、消息的索引文件如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:

消息索引


索引文件主要用于根据key来查询消息的,流程主要是:  
  • 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
  • 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
  • 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
返回列表