首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

Linux 上的 WebSphere MQ 开发快速入门(2)

Linux 上的 WebSphere MQ 开发快速入门(2)

示例应用程序用于将消息放入队列中 (MQSend) 和从队列获取消息 (MQGet) 的示例 Java 应用程序可使用前面部分中定义的队列。这些应用程序支持 String 消息,旨在用于测试和作为说明如何从 Java 应用程序使用 WebSphere MQ 的示例。可以从脚本使用应用程序,包括各种类型的应用程序。MQSend 和 MQGet 都从 MQConnector 类进行扩展,该类可提供 WebSphere MQ 连接、初始化 WebSphere MQ、打开和关闭队列,向队列发送消息及从队列接收消息。
MQConnectorMQConnector 是用于进行发送和获取操作的超类。该类处理打开连接和发送及获取消息的工作。将使用属性文件对其进行配置,需要在属性文件中指定主机地址、队列管理器名称和队列名称:
1
2
3
queue.manager=WMQ1QM
queue.manager.host=192.168.28.71
queue.name=WMQ1OutputQ




以下是 MQConnector 源代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package mqconn;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;

import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.MQQueueManager;

public class MQConnector
{
  protected String qManager = ""; // define name of queue manager

  protected String qManagerHost = "";

  protected String queuName = ""; // define name of queue

  protected MQQueue mqQueue;

  protected MQQueueManager qMgr;

  public static boolean DEBUG = true;

  public MQConnector()
  {

  }

  public void initMq()
  {
    try
    {
      FileInputStream fis = new FileInputStream(new File("mqconnect.properties"));
      Properties props = new Properties();
      props.load(fis);
      fis.close();
      qManager = props.getProperty("queue.manager");
      qManagerHost = props.getProperty("queue.manager.host");
      queuName = props.getProperty("queue.name");

      // Create a connection to the queue manager
      MQEnvironment.channel = "SYSTEM.DEF.SVRCONN";
      MQEnvironment.hostname = qManagerHost;
      debug("Connecting to QueueManager " + qManager + " on " + qManagerHost);
      qMgr = new MQQueueManager(qManager);
    }
    catch (Exception e)
    {
      e.printStackTrace();
    }
  }

  public void openQueue() throws MQException
  {
    // Set up the options on the queue we wish to open...
    // Note. All WebSphere MQ Options are prefixed with MQC in Java.
     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;
    // Now specify the queue that we wish to open,
    // and the open options...   
    debug("Opening queue: " + queuName);
    try
    {
      mqQueue = qMgr.accessQueue(queuName, openOptions);
    }
    catch(MQException mqe)
    {
      //check if MQ reason code 2045
      //means that opened queu is remote and it can not be opened as
      //input queue
      //try to open as output only
      if(mqe.reasonCode==2045)
      {
        openOptions = MQC.MQOO_OUTPUT;
        mqQueue = qMgr.accessQueue(queuName, openOptions);
      }
    }
  }
     
  public void putMessageToQueue(String msg) throws MQException
  {
    try
    {
      debug("Sending message: " + msg);

      MQPutMessageOptions pmo = new MQPutMessageOptions();
      MQMessage mqMsg = new MQMessage();
      mqMsg.write(msg.getBytes());

      // put the message on the queue
      mqQueue.put(mqMsg, pmo);
    }
    catch (IOException e)
    {
      e.printStackTrace();
    }
  }

  public String getMessageFromQueue() throws MQException
  {
    try
    {
      MQMessage mqMsg = new MQMessage();
      
      MQGetMessageOptions gmo = new MQGetMessageOptions();

      // Get a message from the queue
      mqQueue.get(mqMsg,gmo);  
        
      //Extract the message data
      int len=mqMsg.getDataLength();
      byte[] message = new byte[len];
      mqMsg.readFully(message,0,len);
      return new String(message);
    }
    catch(MQException mqe)
    {
      int reason=mqe.reasonCode;
      
      if(reason==2033)//no messages
      {
        return null;
      }
      else
      {
        throw mqe;
      }
    }   
    catch (IOException e)
    {
      e.printStackTrace();
      return null;
    }
  }
   
   
  public void closeQueue() throws MQException
  {
    debug("Closing queue and disconnecting QueueManager...");

    // Close the queue...
    mqQueue.close();
    // Disconnect from the queue manager
    qMgr.disconnect();

  }

  protected boolean hasArg(String arg, String[] args)
  {
    for(int i=0;i<args.length;i++)
    {
      if(args.equals(arg))
      {
        return true;
      }
    }
    return false;
  }
   
  public void debug(Object msg)
  {
    if (DEBUG)
    {
      System.out.println(msg);
    }
  }
     
}




该类向其他应用程序提供 WebSphere MQ 功能。还可以在自己的应用程序中将 MQConnector 作为其他类的超类或实用类使用。以下方法为 Java 应用程序提供消息传递功能:
initMQ()读取属性并创建队列管理器对象。openQueue()打开属性文件中指定的队列。closeQueue()关闭已打开的队列。disconnectMq()断开队列管理器。putMessageToQueue(String)将字符串消息放置到指定的队列。getMessageFromQueue()从指定的队列读取字符串消息。MQSendMQSend(如下所示)将使用 MQConnector 类,是用于向队列发送消息的命令行程序。消息作为命令行参数指定,可以使用其将文件内容作为消息发送:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package mqconn;

import java.io.BufferedReader;
import java.io.FileReader;

import com.ibm.mq.MQException;

public class MQSend extends MQConnector
{

  public MQSend()
  {

  }

  public void send(String[] args) throws MQException
  {
    boolean argsAreFiles = hasArg("-f", args);
    initMq();
    openQueue();
    for (int i = 0; i < args.length; i++)
    {
      if (args.equals("-f"))
        continue;

      if (!argsAreFiles)
      {
        putMessageToQueue(args);
      }
      else
      {
        try
        {
          // send file contents as message
          BufferedReader br = new BufferedReader(new FileReader(args));

          StringBuffer msg = new StringBuffer();
          for (String line = br.readLine(); line != null; line = br.readLine())
          {
            msg.append(line);
            msg.append('\n');
          }

          br.close();
          putMessageToQueue(msg.toString());
        }
        catch (Exception e)
        {
          System.out.println("Error while processing file " + args + ": "
              + e.toString());
        }
      }
      System.out.println("Message sent.");
    }

    closeQueue();
    disconnectMq();
  }

  public static void main(String[] args)
  {
    MQSend mqsend = new MQSend();
    MQConnector.DEBUG = false;
    try
    {
      if (args == null || args.length == 0)
      {
        System.out.println("Usage: " + mqsend.getClass().getName()
            + " [-f] <file name | message> [<file name | message> ...]");
        System.exit(0);
      }
      mqsend.send(args);
    }
    catch (Exception e)
    {
      System.out.println(e.toString());
      System.out.println("Usage: " + mqsend.getClass().getName()
          + " [-f] <file name | message> [<file name | message> ...]");
    }

  }

}




MQSend 使用 MQConnector 提供消息传递,实际的 MQSend 程序功能非常简单。它有两种操作模式:
  • 将每个命令行参数作为独立消息传递。
  • 使用 -f 选项发送文件内容。在本例中,每个命令行参数都是文件名。
返回列表