HHeLiBeXの日記 正道編

日々の記憶の記録とメモ‥

NetBeansを使ってJMSで遊んでみる

今からでも遅くないらしいので、JMS(Java Message Service)で遊んでみた。

今回は、以下を動かしてみた。

  • この記事にサンプルが載っている「Point-to-Pointメッセージドメイン」を使ったメッセージング
  • 「Publish/Subscribeメッセージドメイン」を使ったメッセージング

まず、サンプルコードからmainメソッドを抜き出し、次のようなMainクラスを作成する。
■Sender側:

package jmssample01;

public class Main {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        if (args.length != 1) {
            printUsageAndExit();
            return;
        }
        if (args[0].equalsIgnoreCase("PTP")) {
            new PtpSender().sendGreetingMessage();
        } else if (args[0].equalsIgnoreCase("Pub/Sub")) {
            new PubSubSender().sendGreetingMessage();
        } else {
            printUsageAndExit();
        }
    }

    private static void printUsageAndExit() {
        System.err.println("Usage: " + Main.class.getName() + " PTP");
        System.err.println("        or");
        System.err.println("       " + Main.class.getName() + " Pub/Sub");
        System.exit(1);
    }

}

■Receiver側:

package jmssample01;

public class Main {

    /**
     * @param args the command line arguments
     */
    public static void main(String[] args) {
        if (args.length != 1) {
            printUsageAndExit();
            return;
        }
        if (args[0].equalsIgnoreCase("PTP")) {
            new PtpReceiver().receiveGreetingMessage();
        } else if (args[0].equalsIgnoreCase("Pub/Sub")) {
            new PubSubReceiver().receiveGreetingMessage();
        } else {
            printUsageAndExit();
        }
    }

    private static void printUsageAndExit() {
        System.err.println("Usage: " + Main.class.getName() + " PTP");
        System.err.println("        or");
        System.err.println("       " + Main.class.getName() + " Pub/Sub");
        System.exit(1);
    }

}

もちろん、サンプルコードからmainメソッドを削除しておく。
次に、Pub/Sub用のJMS接続ファクトリとJMS送信先リソースを作成する。

  • JMS接続ファクトリ:リソースタイプに"javax.jms.TopicConnectionFactory"を指定する。
  • JMS送信先リソース:リソースタイプに"javax.jms.Topic"を指定する。

後は、Pub/Subを使ったメッセージングを行うためのクラスを作成する。
■Sender側(PubSubSenderクラス):

package jmssample01;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class PubSubSender {

    public void sendGreetingMessage() {
        TopicConnectionFactory myConnectionFactory = null;
        Topic myTopic = null;
        Connection myConnection = null;
        Session mySession = null;
        MessageProducer messageProducer = null;
        TextMessage textMessage = null;
        try {
            // コネクションファクトリとデスティネーションの取得(アノテーションは使用せず)
            Context c = new InitialContext();
            myConnectionFactory = (TopicConnectionFactory)c.lookup("jms/JMSSample01TopicConnectionFactory");
            myTopic = (Topic)c.lookup("jms/JMSSample01Topic");

            myConnection = myConnectionFactory.createTopicConnection();
            mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // MessageProducerとTextMessageはSessionオブジェクトから生成される
            messageProducer = mySession.createProducer(myTopic);
            textMessage = mySession.createTextMessage();
            textMessage.setText("芋焼酎酒店さん起きてますか?");
            // 送信側がキューへメッセージを送信
            messageProducer.send(textMessage);

            textMessage.setText("新しい銘柄が入荷しました。カタログへの追加お願いします。");
            messageProducer.send(textMessage);

            textMessage.setText("では、また!");
            messageProducer.send(textMessage);

        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException ne) {
            ne.printStackTrace();
        } finally {
            try {
                if (myConnection != null) {
                    myConnection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                myConnection = null;
            }
            try {
                if (mySession != null) {
                    mySession.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                mySession = null;
            }
            try {
                if (messageProducer != null) {
                    messageProducer.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                messageProducer = null;
            }
        }
    }

}

■Receiver側(PubSubReceiverクラス):

package jmssample01;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class PubSubReceiver {

    private static final String GOOD_BYE_MESSAGE = "では、また!";

    public void receiveGreetingMessage() {
        TopicConnectionFactory myConnectionFactory = null;
        Topic myTopic = null;
        Connection myConnection = null;
        Session mySession = null;
        MessageConsumer messageConsumer = null;
        TextMessage textMessage = null;
        try {
            // コネクションファクトリとデスティネーションの取得(アノテーションは使用せず)
            Context c = new InitialContext();
            myConnectionFactory = (TopicConnectionFactory)c.lookup("jms/JMSSample01TopicConnectionFactory");
            myTopic = (Topic)c.lookup("jms/JMSSample01Topic");

            myConnection = myConnectionFactory.createTopicConnection();
            mySession = myConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // messageConsumerとTextMessageはSessionオブジェクトから生成される
            messageConsumer = mySession.createConsumer(myTopic);
            myConnection.start();
            boolean isGoodByeMessage = false;
            while(!isGoodByeMessage) {
                textMessage= (TextMessage)messageConsumer.receive();
                if (textMessage != null) {
                    System.out.println("メッセージを受け取りました:" + textMessage.getText());
                }
                if (textMessage.getText() != null
                    && textMessage.getText().equals(GOOD_BYE_MESSAGE)) {
                    isGoodByeMessage = true;
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        } catch (NamingException ne) {
            ne.printStackTrace();
        } finally {
            try {
                if (myConnection != null) {
                    myConnection.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                myConnection = null;
            }
            try {
                if (mySession != null) {
                    mySession.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                mySession = null;
            }
            try {
                if (messageConsumer != null) {
                    messageConsumer.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                messageConsumer = null;
            }
        }
    }

}

とは言っても、実はサンプルコードとほとんど変わらない。変わっているのは次の点。

  • "javax.jms.QueueConnectionFactory"を"javax.jms.TopicConnectionFactory"に置き換える。
  • "javax.jms.Queue"を"javax.jms.Topic"に置き換える。
  • JMS接続ファクトリとJMS送信先リソースのJNDI名を変更する。
  • JMS接続ファクトリからコネクションを生成するためのメソッド名が"createQueueConnection"から"createTopicConnection"に変わる。
  • (変数名"myQueue"を"myTopic"に変更)


さて、これを普通に動かしておしまい、では面白くないので、Senderが1つに対してReceiverを2つ起動してみる。
Point-to-Pointの場合:

  1. Receiver1を起動する。
  2. Receiver2を起動する。
  3. Senderを起動する。
  4. もう一度Senderを起動する。
  5. 各Receiverの出力が次のようになった(タイミングなどによって結果は異なる)

Receiver1:

メッセージを受け取りました:新しい銘柄が入荷しました。カタログへの追加お願いします。
メッセージを受け取りました:では、また!

Receiver2:

メッセージを受け取りました:芋焼酎酒店さん起きてますか?
メッセージを受け取りました:芋焼酎酒店さん起きてますか?
メッセージを受け取りました:新しい銘柄が入荷しました。カタログへの追加お願いします。
メッセージを受け取りました:では、また!

2つのReceiverがメッセージを食い合っているのがわかる。
Pub/Subの場合:

  1. Receiver1を起動する。
  2. Receiver2を起動する。
  3. Senderを起動する。
  4. 両Receiver共に出力が次のようになった。
メッセージを受け取りました:芋焼酎酒店さん起きてますか?
メッセージを受け取りました:新しい銘柄が入荷しました。カタログへの追加お願いします。
メッセージを受け取りました:では、また!

受信待ちのReceiver全員に同じメッセージが送られていることがわかる。