首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

使用 Java API 处理 WebSphere MQ 大消息(4)

使用 Java API 处理 WebSphere MQ 大消息(4)

清单 3 消息分组

[size=0.875]1


[size=0.875]2


[size=0.875]3


[size=0.875]4


[size=0.875]5


[size=0.875]6


[size=0.875]7


[size=0.875]8


[size=0.875]9


[size=0.875]10


[size=0.875]11


[size=0.875]12


[size=0.875]13


[size=0.875]14


[size=0.875]15


[size=0.875]16


[size=0.875]17


[size=0.875]18


[size=0.875]19


[size=0.875]20


[size=0.875]21


[size=0.875]22


[size=0.875]23


[size=0.875]24


[size=0.875]25


[size=0.875]26


[size=0.875]27


[size=0.875]28


[size=0.875]29


[size=0.875]30


[size=0.875]31


[size=0.875]32


[size=0.875]33


[size=0.875]34


[size=0.875]35


[size=0.875]36


[size=0.875]37


[size=0.875]38


[size=0.875]39


[size=0.875]40


[size=0.875]41


[size=0.875]42


[size=0.875]43


[size=0.875]44


[size=0.875]45


[size=0.875]46


[size=0.875]47


[size=0.875][size=0.875]AppGrpSender.java
[size=0.875]    int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
[size=0.875]    myQMgr = new MQQueueManager ("QM1");
[size=0.875]    myQueue = myQMgr.accessQueue("TESTQ", openOptions);
[size=0.875]    for(int i=0;i<3;i++)
[size=0.875]    {
[size=0.875]        MQMessage myMsg = new MQMessage ();
[size=0.875]        MQPutMessageOptions pmo = new MQPutMessageOptions ();
[size=0.875]        pmo.options = MQC.MQPMO_LOGICAL_ORDER + MQC.MQPMO_SYNCPOINT;
[size=0.875]        if (i<2)
[size=0.875]            myMsg.messageFlags = MQC.MQMF_MSG_IN_GROUP;
[size=0.875]        else
[size=0.875]            myMsg.messageFlags = MQC.MQMF_LAST_MSG_IN_GROUP;
[size=0.875]        String strMsg = "Hello" + i;
[size=0.875]        myMsg.write(strMsg.getBytes());
[size=0.875]        myQueue.put(myMsg,pmo);
[size=0.875]        System.out.println("Put message" + (i+1) + " '" + strMsg + "'! ");
[size=0.875]    }
[size=0.875]    myQMgr.commit();
[size=0.875]    myQueue.close();
[size=0.875]    myQMgr.disconnect();
[size=0.875]AppGrpReceiver.java
[size=0.875]    int openOptions = MQC.MQOO_INPUT_SHARED | MQC.MQOO_FAIL_IF_QUIESCING;
[size=0.875]    myQMgr = new MQQueueManager ("QM1");
[size=0.875]    myQueue = myQMgr.accessQueue("TESTQ", openOptions);
[size=0.875]    MQMessage myMsg;
[size=0.875]    MQGetMessageOptions gmo = new MQGetMessageOptions ();
[size=0.875]    gmo.options =
[size=0.875]    MQC.MQGMO_LOGICAL_ORDER + MQC.MQGMO_SYNCPOINT + MQC.MQGMO_ALL_MSGS_AVAILABLE;
[size=0.875]    String strMsg = "";
[size=0.875]    boolean isLastMsg = false;
[size=0.875]    int seq = 0;
[size=0.875]    while(!isLastMsg)
[size=0.875]    {
[size=0.875]        seq++;
[size=0.875]        myMsg = new MQMessage ();
[size=0.875]        myQueue.get(myMsg, gmo);
[size=0.875]        if (myMsg.messageFlags == MQC.MQMF_MSG_IN_GROUP + MQC.MQMF_LAST_MSG_IN_GROUP)
[size=0.875]            isLastMsg = true;
[size=0.875]        byte[] b = new byte[myMsg.getMessageLength()];
[size=0.875]        myMsg.readFully(b);
[size=0.875]        strMsg = new String(b);
[size=0.875]        System.out.println("Got message" + seq + ":\n" + strMsg);
[size=0.875]    }
[size=0.875]    myQMgr.commit();
[size=0.875]    myQueue.close();
[size=0.875]    myQMgr.disconnect();




[size=1.0625]程序功能介绍:
  • AppGrpSender 程序是使用一个 for 循环,构造一个组的三个消息,分别写入队列 TESTQ 中。
  • AppGrpReceiver 程序是从队列 TESTQ 中循环读取消息,根据其逻辑顺序以及是否是组内最后一个消息来判断是否已取完同一组内的所有消息。
[size=1.0625]相对于消息分片,消息分组不仅仅是处理大消息的一种方法,更为重要的是,消息分组还能维护一组业务数据中的逻辑关系。
结束语[size=1.0625]消息分片和消息分组是在 WebSphere MQ 的编程中处理大消息的常用手段,到底采用哪种方式比较合适,需要根据实际的需求而定。如果大消息需要分割成有实际业务意义的一批小消息,那么采用消息分组比较合适;反之,如果大消息无法分割成有实际业务意义的小消息,那么就采用消息分片。甚至在某些复杂的场合下,消息分片和消息分组可以结合起来使用,比如,某批消息传输时由于有先后顺序的要求,被归并到一个组内,同时由于部分消息比较大,又需要分片传输,有兴趣的读者可以自己来实现一下这个复杂的场景。
山不在高,有仙则名;水不在深,有龙则灵。
返回列表