分布式开放消息系统(RocketMQ)的原理与实践-5
- UID
- 1066743
|
分布式开放消息系统(RocketMQ)的原理与实践-5
3、消息存储实现消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。
[url=][/url]
// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate settingmsg.setBodyCRC(UtilAll.crc32(msg.getBody()));StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();synchronized (this) { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); // Here settings are stored timestamp, in order to ensure an orderly global msg.setStoreTimestamp(beginLockTimestamp); // MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中 MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(); // 将Message追加到文件commitlog result = mapedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK:break; case END_OF_FILE: // Create a new file, re-write the message mapedFile = this.mapedFileQueue.getLastMapedFile(); result = mapedFile.appendMessage(msg, this.appendMessageCallback); break; DispatchRequest dispatchRequest = new DispatchRequest( topic,// 1 queueId,// 2 result.getWroteOffset(),// 3 result.getWroteBytes(),// 4 tagsCode,// 5 msg.getStoreTimestamp(),// 6 result.getLogicsOffset(),// 7 msg.getKeys(),// 8 /** * Transaction */ msg.getSysFlag(),// 9 msg.getPreparedTransactionOffset());// 10 // 1.分发消息位置到ConsumeQueue // 2.分发到IndexService建立索引 this.defaultMessageStore.putDispatchRequest(dispatchRequest);}[url=][/url]
4、消息的索引文件如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:
消息索引
索引文件主要用于根据key来查询消息的,流程主要是:
- 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
- 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
- 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
|
|
|
|
|
|