Friday, March 19, 2010

JMS Synchronous Request/Reply

So far I've introduced some basics around JMS 1.1, including the Common Interface and Security/Concurrency. Continuing through the JMS 1.1 spec, my next drill-down will be around the Request/Reply idiom. Just so you know, these articles track my own learning curve around JMS, so you are invited to correct me and comment as needed.

Section 2.10 of the spec describes Request/Reply as an arrangement where a client sends a message to a remote service of some kind, expecting a reply. The request message header specifies a destination to which the service can (optionally!) respond with some information, or a confirmation that a requested action has been performed, or...etc. A well-behaved service not only replies, but the reply includes the ID of the request message (typically) so the requestor can correlate the reply with the previous request. JMS additionally provides basic helper implementations (QueueRequestor and TopicRequestor) that encapsulate some of the details involved here, including creating the temporary queue or temporary topic to which the reply is sent. In this series of posts, I'll trace my own prototyping, starting with the QueueRequestor (which is a synchronous messaging style) and evolving with some asynchronous alternatives, adding functionality in stages.

I'll use increasingly sophisticated levels of remote service, with various flavors of requestors associated with each. The remote services include one that simply listens on a request queue; another that does that plus notifies requestors about exceptions or invalid messages; and a third that does that plus uses message filtering on an asynchronous shared queue so that only the original requestor receives the reply. This post will describe the basic remote service and a basic requestor.

The remote service listens asynchronously on a system-wide queue and replies to requests at the reply-to destination specified in the JMSReplyTo header field. It is not terribly robust - it assumes the requestor will be sending non-null text messages; if a given message is either null or non-text, an exception is thrown. The service constructs a reply with the correlation ID (JMSCorrelationID header field) set to the request message ID (JMSMessageID header field), sends the reply and acknowledges the request.

This class provides a hook for subclasses to further decorate the message (e.g. setting properties, etc.), because we just happen to know we'll be using that in a later prototype extending this remote service.

The requestor connects to the service queue, sends a request using the QueueRequestor (which blocks on a dedicated temporary channel), and verifies that the reply has a correlation ID that matches the request message ID.

As an aside, the QueueRequestor and TopicRequestor classes are, interestingly, the only concrete classes in the JMS 1.1 distro - in other words, JMS providers need not implement anything here. Though, it wouldn't be surprising if provider-specific request/reply classes are available; it's depends on your needs as to whether proprietary mechanisms are appropriate, as always.

Here's the code for the service:

package com.mybiz.jms.activemq.server.requestreply.replier;

import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import org.apache.activemq.ActiveMQConnection;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

public class RemoteService implements MessageListener {

    // using an ActiveMQ provider
    public static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    public static final String serviceQueueName = "ServiceQueue";

    // ConnectionStuff is just a convenience composite object that 
    // encapsulates a destination, the session for that destination and 
    // the connection for that session.
    private ConnectionStuff serviceStuff;

    public RemoteService() {}

    public RemoteService(String theUrl, String serviceQueueName)
            throws JMSException {

        // establish the connection, session and destination, and start the connection
        serviceStuff = new AsyncConsumerConnectionStuff(theUrl, serviceQueueName, this);
        System.out.println("Service is waiting for a request...");
    }

    private boolean shutdown = false;
    protected void shutdown() throws Exception {

        // all done - release connection. No need to release any other objects,
        // that's done automatically when releasing the connection.
        getServiceQueueConnection().close();
    }

    protected Connection getServiceQueueConnection()
    {
        return serviceStuff.getConnection();
    }

    // assumes non-null TextMessage; sends a reply to the specified reply-to. 
    // Throws an exception if any problem occur.
    public void onMessage(Message requestMsg) {

        try {

            TextMessage requestTextMsg = (TextMessage) requestMsg;
            if (requestTextMsg.getText() == null) {
                   throw new IllegalStateException("NULL message received");
               }

            // MessageUtil is a convenience class that prints out 
            // information about messages, and provides various other conveniences
            MessageUtil.examineReceivedRequest(requestTextMsg);
            sendReply(serviceStuff, requestMsg, "Normal Service Reply",
                    requestMsg.getJMSReplyTo());

        } catch (Exception e) {

            throw new IllegalStateException(e);
        }
    }

    // Send the given reply to the given destination and acknowledge the given request message.
    protected void sendReply(ConnectionStuff stuff,
                             Message requestMessage, String reply,
                             Destination destination) throws JMSException {

        // construct and send the reply, with correlation ID set to message ID
        TextMessage replyMsg = stuff.getSession().createTextMessage();
        replyMsg.setJMSCorrelationID(requestMessage.getJMSMessageID());
        replyMsg.setText(reply);

        // allow subclasses to do extra stuff; default impl does nothing
        replyMsg = decorateMessage(requestMessage, replyMsg);

        MessageProducer producer = stuff.getSession().createProducer(destination);
        producer.send(replyMsg);
        MessageUtil.examineSentReply(replyMsg);

        requestMessage.acknowledge();
    }

    /**
     * Provides support for subclasses to further fill in the reply, e.g. 
     * by setting various properties. We just happen to know that this will 
     * be needed for the full demo - see MessageSelectorRemoteService.
     */
    protected TextMessage decorateMessage(Message requestMessage, 
            TextMessage replyMsg)
            throws JMSException {

        return replyMsg;
    }

    public static void main(String[] args) throws Exception {

        RemoteService service = new RemoteService(url, serviceQueueName);
        run(service);
    }

    /**
     * Runs the given remote service; subclasses should call this to run 
     * themselves - so that in case this demo is extended (which it will be), 
     * they won't need to duplicate the same execution code.
     */
    public static void run(RemoteService service) throws Exception {

        while (!service.shutdown) {
            // this service could support receiving a message that it reacts to by
            // setting shutdown to true; or you could set it manually in 
            // the debugger; or you could just not worry about it. This is 
            // just a demo.
            Thread.currentThread().sleep(1000);
        }
        service.shutdown();
        System.out.println("Done - exiting");
    }
}

And, here's the requestor code. The comments explain how to demo things (assuming the ActiveMQ provider is already running), and describes what we might think about to improve the functionality:

package com.mybiz.jms.activemq.server.requestreply.requestor.sync;

import com.mybiz.jms.activemq.server.requestreply.connection.SyncRequestorConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class VanillaSyncRequestor {

//    Demo:
//
//    Start the remote service, then run the requestor - console printouts 
//    reflect request and reply on both sides of messaging provider.
//
//    Now kill the remote service, then run the requestor - request is made, 
//    but no reply just yet. Start up the service "after a while", and the 
//    reply is received.
//
//    What does it need:
//
//    This is a simple request-reply demonstration. It will not know if 
//    the remote service throws an exception (which can happen, as noted 
//    above, if the request is either null or is not a text message). Though 
//    it has a dedicated temporary channel, it can possibly block for indefinite
//    periods of time - e.g. if the service goes down, it will block until that 
//    service is back online; worse, if the service throws an exception, it can 
//    block "forever". Blocking until the service comes back online can be 
//    considered a good thing, i.e. the request will "eventually" be handled -
//    and, it can be considered a bad thing, e.g. if many clients are blocking 
//    on a wait for an offline service, this consumes system resources.

    /**
     * Send a text message using the given connection stuff, wait synchronously 
     * for a reply.
     */
    public void sendRequest(SyncRequestorConnectionStuff stuff) throws JMSException {

        TextMessage requestMessage = stuff.getSession().createTextMessage(
                "Do this as soon as you're online!");

        // this convenience method sends the request, at which point the 
        // JMSMessageID is set in the header of that message; it acknowledges 
        // the reply and then confirms that the JMSCorrelationID in the 
        // reply matches the JMSMessageID, throwing an exception if that is 
        // not the case.
        MessageUtil.processRequestAndReply(stuff.getRequestor(), requestMessage);
    }

    public static void main(String[] args) throws JMSException {

        VanillaSyncRequestor requestor = new VanillaSyncRequestor();

        // get the connection, session and QueueRequestor - on return, 
        // the connection will have been started
        SyncRequestorConnectionStuff serviceQueueStuff =
                new SyncRequestorConnectionStuff(
                RemoteService.url, RemoteService.serviceQueueName);

        requestor.sendRequest(serviceQueueStuff);

        // requestor is done - release connection. No need to release any 
        // other objects, that's done automatically when releasing the connection.
        serviceQueueStuff.getConnection().close();
    }
}

I'm omitting the code for MessageUtil and ConnectionStuff - these can be regarded as black-boxes that "just work" as noted, for purposes of this article. But I will offer the printouts that occur when the suggested demo steps are taken:

1 - Start up the ActiveMQ provider (you'll of course need to download and install this first):

$ cd $ACTIVEMQ_HOME/apache-activemq-5.3.0/bin 
$ ./activemq
.........
(lots of noisy startup messages, omitted here...)
Loading message broker from: xbean:activemq.xml 
..........
INFO | Listening for connections at: tcp://localhost:61616 
.......... (etcetera etcetera)

2 - Start the service (I've done this in my IDE; just run the RemoteService class):

Mar 19, 2010 12:15:47 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Service is waiting for a request...

3 - Run VanillaSyncRequestor; observe that it sends a request and immediately gets a reply:

INFO: Successfully connected to tcp://localhost:61616
12:16:06.937 Sending request for service 'Do this as soon as you're online!'...
12:16:08.750 Request for service 'Do this as soon as you're online!' has been sent...
    Message ID: ID:080301h0114dl-3388-1269022561656-0:0:1:1:1
    Correlation ID: null
    Reply to:   temp-queue://ID:080301h0114dl-3388-1269022561656-0:0:1
----------------------------------------------
12:16:08.750 Received reply 'Normal Service Reply'
    Message ID: ID:080301h0114dl-3375-1269022545562-0:0:1:1:1
    Correlation ID: ID:080301h0114dl-3388-1269022561656-0:0:1:1:1
    Reply to:   null
----------------------------------------------

4 - Stop the service; run the requestor again - note that it sends the request but then appears to stop - in fact, it's blocking, waiting for the service to come back online:

INFO: Successfully connected to tcp://localhost:61616
12:30:07.875 Sending request for service 'Do this as soon as you're online!'...

5 - Start up the service - the printout there looks the same as above, since it grabs the message waiting in the queue and replies. At that point, thankfully, the requestor now completes, albeit "some time later", as can be seen by the timestamps:

12:31:57.937 Request for service 'Do this as soon as you're online!' has been sent...
    Message ID: ID:080301h0114dl-3906-1269023399953-0:0:1:1:1
    Correlation ID: null
    Reply to:   temp-queue://ID:080301h0114dl-3906-1269023399953-0:0:1
----------------------------------------------
12:31:57.937 Received reply 'Normal Service Reply'
    Message ID: ID:080301h0114dl-3964-1269023512671-0:0:1:1:1
    Correlation ID: ID:080301h0114dl-3906-1269023399953-0:0:1:1:1
    Reply to:   null
----------------------------------------------

As the comments in VanillaSyncRequestor indicate, there is room for improvement. Next, I'll demonstrate a client that blocks "forever", and take some steps to at least partially mitigate that. Then we'll move on to use asynchronous messaging in a request/reply approach.

References

JMS Home Page
JMS Downloads
JMS FAQ
JEE 5 JMS Tutorial
JEE 5 JMS Tutorial - Example Code
JEE 5 Online Javadoc
JMS 1.1 Online Javadoc
Apache ActiveMQ

1 comment: