Board logo

标题: 分布式开放消息系统(RocketMQ)的原理与实践-5 [打印本页]

作者: look_w    时间: 2018-12-18 20:19     标题: 分布式开放消息系统(RocketMQ)的原理与实践-5

3、消息存储实现消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。
[url=][/url]
// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate settingmsg.setBodyCRC(UtilAll.crc32(msg.getBody()));StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();synchronized (this) {    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();    // Here settings are stored timestamp, in order to ensure an orderly global    msg.setStoreTimestamp(beginLockTimestamp);    // MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();    // 将Message追加到文件commitlog    result = mapedFile.appendMessage(msg, this.appendMessageCallback);    switch (result.getStatus()) {    case PUT_OK:break;    case END_OF_FILE:         // Create a new file, re-write the message         mapedFile = this.mapedFileQueue.getLastMapedFile();         result = mapedFile.appendMessage(msg, this.appendMessageCallback);     break;     DispatchRequest dispatchRequest = new DispatchRequest(                topic,// 1                queueId,// 2                result.getWroteOffset(),// 3                result.getWroteBytes(),// 4                tagsCode,// 5                msg.getStoreTimestamp(),// 6                result.getLogicsOffset(),// 7                msg.getKeys(),// 8                /**                 * Transaction                 */                msg.getSysFlag(),// 9                msg.getPreparedTransactionOffset());// 10    // 1.分发消息位置到ConsumeQueue    // 2.分发到IndexService建立索引    this.defaultMessageStore.putDispatchRequest(dispatchRequest);}[url=][/url]

4、消息的索引文件如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:

消息索引


索引文件主要用于根据key来查询消息的,流程主要是:  





欢迎光临 电子技术论坛_中国专业的电子工程师学习交流社区-中电网技术论坛 (http://bbs.eccn.com/) Powered by Discuz! 7.0.0