001package org.opengion.fukurou.queue;
002
003import java.util.ArrayList;
004import java.util.List;
005
006import javax.jms.JMSException;
007// import javax.jms.Message;
008import javax.jms.MessageListener;
009import javax.jms.Queue;
010import javax.jms.QueueConnection;
011import javax.jms.QueueConnectionFactory;
012import javax.jms.QueueReceiver;
013import javax.jms.QueueSession;
014import javax.jms.TextMessage;
015import javax.naming.Context;
016import javax.naming.InitialContext;
017
018import org.apache.activemq.ActiveMQConnectionFactory;
019
020/**
021 * MQメッセージ受信用クラス。
022 *
023 * @og.group メッセージ連携
024 *
025 * @og.rev 5.10.15.2 (2019/09/20) 新規作成
026 * 
027 * @version 5
028 * @author oota
029 * @since JDK7
030 */
031public class QueueReceive_MQ implements QueueReceive{
032
033        private QueueConnection connection = null;
034        private QueueSession session = null;
035        private QueueReceiver receiver = null;
036        List<QueueReceiver> listReceiver = null;
037        private boolean batch = false;
038        
039        /**
040         * 接続処理
041         * メッセージキューサーバに接続します。
042         * 
043         *  @param jmsServer jsmサーバ
044         *  @param sqsAccessKey sqs用awsアクセスキー(MQでは利用しません)
045         *  @param sqsSecretKey sqs用awsシークレットキー(MQでは利用しません)
046         */
047        public void connect(final String jmsServer, final String sqsAccessKey, final String sqsSecretKey) {
048                connect(jmsServer);
049        }
050        
051        /**
052         * 接続処理
053         * jmsServerに接続します。
054         * MQの場合は、受信リスナーを設定して、随時メッセージ受信処理を行います。
055         * SQSの場合は最大受信件数の10件の処理を行います。
056         * 
057         * @param jmsServer 接続先情報 MQ:jndi接続先 SQS:URL
058         */
059        private void connect(final String jmsServer) {  
060                try {
061                        if(batch) {
062                                // バッチ用
063                                final String mqUserId = System.getProperty("mqUserId");
064                                final String mqPassword = System.getProperty("mqPassword");
065                                final QueueConnectionFactory factory = new ActiveMQConnectionFactory(jmsServer);                
066                                connection = factory.createQueueConnection(mqUserId,  mqPassword);
067                        }else {
068                                // jndi接続用
069                                final Context ctx = new InitialContext();
070                                final QueueConnectionFactory factory = (QueueConnectionFactory)ctx.lookup("java:comp/env/" + jmsServer);
071                                connection = factory.createQueueConnection();
072                        }
073                        
074                        connection.start();
075                        
076                        // Receiveの作成
077                        session = connection.createQueueSession(false, QueueSession.CLIENT_ACKNOWLEDGE);
078                        
079                        // 初期化
080                        listReceiver = new ArrayList<QueueReceiver>();
081                }catch(Exception e) {
082                        throw new RuntimeException("MQサーバの接続に失敗しました。:" + e.getMessage());
083                }
084        }
085
086        /**
087         * 受信処理
088         * メッセージキューの受信の処理を行います。
089         * 
090         * @param queueName キューの名前
091         * @return キュー情報格納クラス
092         */
093        @Override
094        public QueueInfo receive(final String queueName) {
095                QueueInfo queueInfo = null;
096                
097                try {
098                        final Queue queue = session.createQueue(queueName);
099                        receiver = session.createReceiver(queue);
100                        
101                        final TextMessage msg = (TextMessage)receiver.receive(1000);
102                        
103                        if(msg != null) {
104                                // メッセージ受信の確認応答
105                                msg.acknowledge();
106                                
107                                // メッセージの設定
108                                queueInfo = new QueueInfo();
109                                queueInfo.setMessage(msg.getText());
110                        }
111                }catch(Exception e) {
112                        throw new RuntimeException(e.getMessage());
113                }finally {
114                        try {
115                                receiver.close();
116                        }catch(Exception e) {}
117                }
118                
119                return queueInfo;
120        }
121        
122        /**
123         * リスナーの起動
124         * 指定したキュー名に対して、
125         * MessageListenerのリスナーを設定します。
126         * 
127         * @param queueName キュー名
128         * @param listener MessageListerを実装したクラス
129         */
130        @Override
131        public void setListener(final String queueName, final MessageListener listener) {
132                QueueReceiver receiver = null;
133                try {   
134                        final Queue queue = session.createQueue(queueName);
135                        receiver = session.createReceiver(queue);
136                        receiver.setMessageListener(listener);
137                        
138                        // リスナーの起動
139                        listReceiver.add(receiver);
140                }catch(JMSException e) {
141                        throw new RuntimeException("リスナーの起動に失敗しました。" + e.getMessage());
142                }
143        }
144
145        /**
146         * クローズリスナー
147         * レシーバーをクローズすることで、
148         * リスナーの処理を終了します。
149         */
150        public void closeListener() {
151                for(final QueueReceiver receiver: listReceiver) {
152                        try {
153                                receiver.close();
154                        }catch(Exception e) {
155                                
156                        }
157                }
158                
159                // 初期化
160                listReceiver = null;
161                listReceiver = new ArrayList<QueueReceiver>(); 
162        }
163        
164        /**
165         * クローズ処理
166         * クローズ処理を行います。
167         */
168        @Override
169        public void close() {
170                if(receiver != null) {
171                        try {
172                                receiver.close();
173                        }catch(Exception e) {
174                                
175                        }
176                }
177                if(session != null) {
178                        try {
179                                session.close();
180                        }catch(Exception e) {
181                                
182                        }
183                }
184                if(connection != null) {
185                        try {
186                                connection.close();
187                        }catch(Exception e) {
188                                
189                        }
190                }
191        }
192
193        /**
194         * バッチ処理判定フラグを設定します。
195         * 
196         * @param batchFlg バッチ処理判定フラグ
197         */
198        public void setBatchFlg(final Boolean batchFlg) {
199                batch = batchFlg;
200        }
201        
202        /**
203         * 検証用メソッド
204         * テスト用のメソッドです。
205         * 
206         * @param args 引数
207         */
208        public static void main(final String[] args) {
209                final QueueReceive receive = new QueueReceive_MQ();
210                final String jmsServer = "tcp://localhost:61616";
211                
212                // バッチフラグにtrueを設定
213                // 未設定の場合は、tomcatのjndi接続処理が実行されます。
214                receive.setBatchFlg(true);
215                
216                // 認証情報の設定
217                System.setProperty("mqUserId", "admin");
218                System.setProperty("mqPassword", "admin");
219                
220                // 接続
221                receive.connect(jmsServer, null, null);
222                
223                // 処理対象のキュー名
224                final String queueName = "queue01";
225                
226                
227                // ** 1件受信する場合
228                final QueueInfo queueInfo = receive.receive(queueName);
229                if(queueInfo != null) {
230                        System.out.println("message:" + queueInfo.getMessage());        
231                }else {
232                        System.out.println("キューが登録されていません。");
233                }
234                
235//              // ** リスナーを設定して、受信を検知すると処理を実行します。(MQのみ)
236//              // MessageListerを実装した、QueueReceiveListenerクラスを作成します。
237//              MessageListener listener = new QueueReceiveListener();
238//              receive.setListener(queueName, listener);
239//              // 複数のキューにリスナーを設定することも可能です。
240//              receive.setListener("queue02", listener);
241//              
242//              try {
243//                      // 1分間リスナーを起動しておく場合の、プロセス待機処理
244//                      Thread.sleep(60 * 1000);
245//              }catch(InterruptedException e) {
246//                      throw new RuntimeException(e.getMessage());
247//              }
248
249                // リスナー利用時は、closeListenerを実行して、解放してください。
250                receive.closeListener();
251
252                // 終了処理
253                receive.close();
254        }
255        
256//      /**
257//       * QueueReceiveリスナークラス
258//       * リスナー用のクラスです。
259//       * MQに設定することで、メッセージが受信されると、
260//       * 自動的にonMessageメソッドが実行されます。
261//       *
262//       */
263//      static class QueueReceiveListener implements MessageListener {
264//              /**
265//               * メッセージ受信処理
266//               * MQサーバにメッセージが受信されると、
267//               * メソッドの処理が行われます。
268//               * 
269//               * @param message 受信メッセージ
270//               */
271//              @Override
272//              public void onMessage(final Message message) {
273//
274//                      // メッセージ受信
275//                      TextMessage msg = (TextMessage) message;
276//                      String msgText = "";
277//
278//                      try {
279//                              // キューサーバのメッセージを取得
280//                              msgText = msg.getText();
281//                              // メーッセージの受信応答を返します。
282//                              msg.acknowledge();
283//                              
284//                              System.out.println("message:" + msgText);
285//
286//                      } catch (JMSException e) {
287//                              throw new RuntimeException(e.getMessage());
288//                      }
289//              }
290//      }
291
292}