首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 Java API 处理 WebSphere MQ 大消息(1)

使用 Java API 处理 WebSphere MQ 大消息(1)

WebSphere MQ 中处理大消息的方法[size=1.0625]使用过 WebSphere MQ 的读者都知道,WebSphere MQ 对处理的单条消息的大小是有限制的,目前支持的最大消息是100M,而且,随着消息大小的增大,WebSphere MQ 处理的性能也会随之下降。从最佳实践来说,WebSphere MQ 传输大小为几K的消息其效率是最高的。那如何使 WebSphere MQ 能高效的处理大消息呢?
[size=1.0625]WebSphere MQ 提供了处理大消息的两种方法:消息分片和消息分组。下面我们来看在使用 Java API 编写 WebSphere MQ 程序时如何实现消息分片和分组。
消息分片[size=1.0625]消息分片的做法是把应用上一个大的逻辑消息分割成一个一个小的片段,每一个片段作为一个 WebSphere MQ 消息独立传输,通过 MQMD 中 GroupId、 MsgSeqNumber 和 Offset 3 个属性来标识,起始消息的 Offset 值为 0,而最后一个消息则会使用如下标记标识这是最后一个片段:MQMF_LAST_SEGMENT。
[size=1.0625]具体从实现上来说,消息分片可以细分为两种模式:一是由队列管理器自动实现消息的分片和组装;二是由应用程序来实现消息的分片和组装。下面我们将详细向您介绍这两种实现方式。
队列管理器自动实现的消息分片[size=1.0625]顾名思义,队列管理器自动实现的消息分片就是由队列管理器来完成消息的分片和组装。对应用程序来说,不管是发送方还是接收方,它所处理的还是一个完整的大消息,只是在程序中通过设置一些标识来指示队列管理器切分消息后再传输。所以,这种方式适用的场合为,出于传输效率的考虑,WebSphere MQ 不适宜传输大消息,而应用程序可以处理大消息,允许占用大块内存。而且,此种方式对编写应用程序来说也比较简单。
[size=1.0625]实际在使用 Java API 编程时,对于发送方,发送消息时需要设置消息的 messageFlags 如下:
[size=1.0625]Msg.messageFlags = MQC.MQMF_SEGMENTATION_ALLOWED;
[size=1.0625]对于接收方,接收消息时需要设置 MQGetMessageOptions:
[size=1.0625]MQGetMessageOptions gmo = new MQGetMessageOptions ();
[size=1.0625]gmo.options = MQC.MQGMO_COMPLETE_MSG;
[size=1.0625]队列管理器自动实现消息分片的部分代码如清单 1,您可以下载详[size=1.0625]细的示例程序代码。
清单 1 队列管理器自动实现消息分片

[size=0.875]1


[size=0.875]2


[size=0.875]3


[size=0.875]4


[size=0.875]5


[size=0.875]6


[size=0.875]7


[size=0.875]8


[size=0.875]9


[size=0.875]10


[size=0.875]11


[size=0.875]12


[size=0.875]13


[size=0.875]14


[size=0.875]15


[size=0.875]16


[size=0.875]17


[size=0.875]18


[size=0.875]19


[size=0.875]20


[size=0.875]21


[size=0.875]22


[size=0.875]23


[size=0.875]24


[size=0.875]25


[size=0.875]26


[size=0.875]27


[size=0.875]28


[size=0.875]29


[size=0.875][size=0.875]QMgrSegSender.java:
[size=0.875]    int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
[size=0.875]    myQMgr = new MQQueueManager ("QM1");
[size=0.875]    myQueue = myQMgr.accessQueue("TESTQ", openOptions);
[size=0.875]    MQMessage myMsg = new MQMessage ();
[size=0.875]    myMsg.messageFlags = MQC.MQMF_SEGMENTATION_ALLOWED;
[size=0.875]    MQPutMessageOptions pmo = new MQPutMessageOptions ();
[size=0.875]    String strMsg = "";
[size=0.875]    for (int i=0;i<=100;i++)
[size=0.875]        strMsg = strMsg + "Hello";
[size=0.875]    myMsg.write(strMsg.getBytes());
[size=0.875]    myQueue.put(myMsg,pmo);
[size=0.875]    System.out.println("Put message:\n" + strMsg);
[size=0.875]    myQueue.close();
[size=0.875]    myQMgr.disconnect();
[size=0.875]QMgrSegReceiver.java:
[size=0.875]    int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
[size=0.875]    myQMgr = new MQQueueManager ("QM1");
[size=0.875]    myQueue = myQMgr.accessQueue("TESTQ", openOptions);
[size=0.875]    MQMessage myMsg = new MQMessage ();
[size=0.875]    MQGetMessageOptions gmo = new MQGetMessageOptions ();
[size=0.875]    gmo.options = MQC.MQGMO_COMPLETE_MSG;
[size=0.875]    myQueue.get(myMsg, gmo);
[size=0.875]    byte[] b = new byte[myMsg.getMessageLength()];
[size=0.875]    myMsg.readFully(b);
[size=0.875]    String strMsg = new String(b);
[size=0.875]    System.out.println("Got message:\n" + strMsg);
[size=0.875]    myQueue.close();
[size=0.875]    myQMgr.disconnect();




[size=1.0625]程序功能介绍:
[size=1.0625](1)QMgrSegSender 程序是构造一个长度为505字节的消息并把它写入队列 TESTQ 中。
[size=1.0625]为使 MQ 不能传输505字节的消息,可以修改队列 TESTQ 的属性“最大消息长度(MAXMSGL)”为100。
[size=1.0625]执行结果如下图 1 所示,该消息被队列管理器自动分割成6个片段消息:
图 1 在 WebSphere MQ 资源管理器中浏览分片消息[size=1.0625](2)QMgrSegReceiver 程序是从队列 TESTQ 中读取一个消息。
[size=1.0625]我们观察执行的结果是它把队列中6个片段消息组成一个完整的大消息取出。
[size=1.0625]使用队列管理器自动实现消息分片对应用开发人员来讲比较简单,但是需要确保程序在内存使用等方面可以处理完整的大消息。
山不在高,有仙则名;水不在深,有龙则灵。
返回列表