Thursday, March 25, 2010

Asynchronous JMS Request/Reply

My recent previous posts have described a JMS-based request/reply client that uses blocking request semantics, and has no way of knowing if the service has thrown an exception and as such will never reply; and another that is marginally better, which listens on an invalid channel for messages about exceptions. That slight improvement still leaves a fair amount to be desired.

To clarify the goal of this exercise, I'd like the following:
  1. JMS request-reply arrangement, i.e. issue a request to some service for some information or for a task to be performed, and receive a reply (either with the information or a status/confirmation about the task)
  2. Receive information about an invalid request, or an exception that happened on the service side, in its attempt to fulfill the request
  3. No blocking request is left hanging in the face of a service-side exception
So, to address #3, we already know we could use block-without-wait or block-with-timeout; or we can use an asynchronous listener. Using the different flavors of blocking request means using one's own MessageConsumer instead of the built-in QueueRequestor (which is what I used in the first two parts of this series). From here, however, I'd prefer to just move on to an asynchronous idiom - but before leaving, check this article for some additional insights around synchronous request-reply.

Now if we use our own asynchronous consumer, we no longer have the automatic temporary queue construction done via the QueueRequestor. We'll construct our own invalid queue, but we do not want one for each potential requestor - construction of these is expensive. To be more precise, I'll quote the ActiveMQ recommendation:

The best way to implement request-response over JMS is to create a temporary queue and consumer per client on startup, set JMSReplyTo property on each message to the temporary queue and then use a correlationID on each message to correlate request messages to response messages. This avoids the overhead of creating and closing a consumer for each request (which is expensive). It also means you can share the same producer & consumer across many threads if you want (or pool them maybe).

This example will not do exactly that; instead, for simplicity (and as an excuse to demonstrate message selection), I'll construct one shared invalid queue - but this presents the problem of how to return invalid information only to the requestor that should hear about it (i.e., if there are multiple queue-based clients, then only the first one to receive from a given queue will get that message). To address this problem, we can use message selection - but keep in mind, this relies on all requestors in a given system to be using the same filter. If for example one rogue client did no filtering with a message selector, it would be competing with the intended receiver for a message about exceptions - and if it got there first, the intended receiver would never know about it.

We've already seen a remote service that sends messages about exceptions to an invalid queue; now we want to extend that to additionally set a property in the reply message that facilitates the intended message selection. Here's an example of that, where the service agrees to set a unique ID as a property in the reply message (see the previous posts for details around the RemoteService, InvalidQueueRemoteService, etc.):

package com.mybiz.jms.activemq.server.requestreply.replier;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

public class MessageSelectorRemoteService extends InvalidQueueRemoteService {

    public static final String OID_PROPERTY = "oid";

    public MessageSelectorRemoteService(String theUrl, String serviceQueueName, String invalidQueueName)
            throws JMSException {

        super(theUrl, serviceQueueName, invalidQueueName);
        System.out.println("   ...service will now add OID property to reply...");
    }

    protected TextMessage decorateMessage(Message requestMessage, TextMessage replyMsg)
            throws JMSException {

        // include the OID_PROPERTY property
        String oid = requestMessage.getStringProperty(OID_PROPERTY);
        replyMsg.setStringProperty(OID_PROPERTY, oid);
        return replyMsg;
    }

    public static void main(String[] args) throws Exception {

        RemoteService service = 
                new MessageSelectorRemoteService(url, serviceQueueName, invalidQueueName);
        RemoteService.run(service);

    }
}

One consumer using this service could set up a single MessageListener that receives both nominal and exceptional messages:

package com.mybiz.jms.activemq.server.requestreply.requestor.async;

import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ProducerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.InvalidQueueRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.MessageSelectorRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

// you'll see lots of protected access in fields and methods here - that's because I just happen
// to know that I'll be extending this class, and protected qualifiers are "good enough" for
// demo purposes:
public class InvalidListener implements MessageListener {

    // add message selector so queue listeners only gets their own messages,
    // and so that no other clients receive those messages instead. This particular
    // OID can fail if two clients are started at exactly the same millisecond:
    protected static final String OID = MessageUtil.getDateTime();
    protected static final String MSG_SELECTOR =
            MessageSelectorRemoteService.OID_PROPERTY + "='" + OID + "'";

    // keep track of number of message sent and received; if the gap between these grows
    // too much, log a message
    protected static int numSent = 0, numReceived = 0;
    protected static final int maxGap = 6;

    // protect numSent and numReceived during concurrent access
    protected static final Object mutex = new Object();

    // this client uses message selectors so it receives only those messages intended
    // for it on the invalid queue, ftm no other clients will receive those messages. With
    // message selectors, the filtering is done by the provider so it's more efficient than
    // asking each client to inspect each message for those that apply to it alone.

    public void onMessage(Message msg) {

        processReceived(msg);
    }

    protected ConnectionStuff setupInvalidConnection() throws Exception {

        // miracle occurs here: this sets the message listener to this object, so both
        // nominal and invalid messages will now arrive in the onMessage method:
        return new AsyncConsumerConnectionStuff(
                RemoteService.url, InvalidQueueRemoteService.invalidQueueName,
                this, MSG_SELECTOR, true);
    }

    protected static void run(InvalidListener requestor) throws Exception {

        boolean shutdown = false;

        // set up producer that sends requests to the service queue. Messages to the queue should
        // have set the replyTo value to a temporary queue created manually.
        ProducerConnectionStuff requestorConnection =
                new ProducerConnectionStuff(RemoteService.url, RemoteService.serviceQueueName, true);

        // set up consumer that receives asynchronously at the temporary queue and start the connection
        ConnectionStuff consumerConnection =
                new AsyncConsumerConnectionStuff(
                        RemoteService.url, "MyListener", requestor,
                        requestor.MSG_SELECTOR, true);

        // factored out so it can be overridden, which will happen in next example requestor:
        ConnectionStuff invalidConnection = requestor.setupInvalidConnection();

        // send a message - async handler should get the reply
        int msgNum = 0;
        while (!shutdown) {
            TextMessage requestMsg = requestorConnection.getSession().createTextMessage();
            if ((++msgNum % 3) == 0) {
                // every 3rd message is NULL, which we know will provoke an invalid queue message
                requestMsg.setText(null);
            } else {
                requestMsg.setText("Async data request #" + msgNum);
            }

            // remote service agrees to set property in invalid replies matching any given
            // with RemoteService.OID_PROPERTY name
            requestMsg.setStringProperty(MessageSelectorRemoteService.OID_PROPERTY, requestor.OID);
            requestorConnection.send(requestMsg, consumerConnection.getDestination());

            requestor.processSent(requestMsg);
        }

        requestorConnection.getConnection().close();
        consumerConnection.getConnection().close();
        invalidConnection.getConnection().close();
    }

    public static void main(String[] args) throws Exception {

        run(new InvalidListener());
    }

    private void processSent(Message requestMsg) throws Exception {

        synchronized (mutex) {
            numSent++;
        }

        MessageUtil.examineSentRequest(requestMsg);
        Thread.currentThread().sleep(3000);

        // confirms the number of messages received vs sent is within a specified threshold. If
        // the remote service goes down, this message will log continuously; but within a finite
        // amount of time (depending on how long the service was down), the gap will be made up
        // (from empirical observations)
        if (numSent - numReceived > maxGap) {
            System.err.println("WARNING: # sent = " + numSent
                    + ", # received = " + numReceived);
        }
    }

    protected void processReceived(Message msg) {

        synchronized (mutex) {
            ++numReceived;
        }
        // confirms the message received has an OID property value as expected
        MessageUtil.process(msg, MessageSelectorRemoteService.OID_PROPERTY, OID);
    }
}

This requestor also monitors the gap between the number of messages sent vs the number received - if that starts getting too large, it will notice; this could indicate some kind of trouble around receiving the expected replies. As hoped, it now receives messages about exceptions, etc. - here's the startup of the service, and then the requestor:

Mar 25, 2010 1:27:25 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
Service is waiting for a request...
INFO: Successfully connected to tcp://localhost:61616
   ...service will now send replies to an INVALID queue...
   ...service will now add OID property to reply...

Mar 25, 2010 1:28:42 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
13:28:42.779 Request for service 'Async data request #1' has been sent...
    Message ID: ID:localhost-34234-1269545322429-0:0:1:1:1
    Correlation ID: null
    Reply to:   queue://MyListener
    OID property:   13:28:42.217
----------------------------------------------
13:28:42.796 Received reply 'Normal Service Reply'
    Message ID: ID:localhost-34222-1269545245163-0:0:1:2:1
    Correlation ID: ID:localhost-34234-1269545322429-0:0:1:1:1
    Reply to:   null
    OID property:   13:28:42.217
----------------------------------------------
13:28:45.792 Request for service 'Async data request #2' has been sent...
    Message ID: ID:localhost-34234-1269545322429-0:0:1:1:2
    Correlation ID: null
    Reply to:   queue://MyListener
    OID property:   13:28:42.217
----------------------------------------------
13:28:45.819 Received reply 'Normal Service Reply'
    Message ID: ID:localhost-34222-1269545245163-0:0:1:3:1
    Correlation ID: ID:localhost-34234-1269545322429-0:0:1:1:2
    Reply to:   null
    OID property:   13:28:42.217
----------------------------------------------
13:28:48.798 Request for service 'null' has been sent...
    Message ID: ID:localhost-34234-1269545322429-0:0:1:1:3
    Correlation ID: null
    Reply to:   queue://MyListener
    OID property:   13:28:42.217
----------------------------------------------
13:28:48.815 Received reply 'Exception occurred: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received'
    Message ID: ID:localhost-34222-1269545245163-0:0:2:2:1
    Correlation ID: ID:localhost-34234-1269545322429-0:0:1:1:3
    Reply to:   null
    OID property:   13:28:42.217
----------------------------------------------

As expected from the requestor logic, every 3rd message results in an exception, and the requestor is notified about this. So we've solved the problem around a request blocking, potentially "forever", if the service throws an exception; however, this particular flavor of consumer must examine each message received to determine whether or not it indicates such a problem. If you'd like to avoid putting that burden on every requestor (since most of the time, the message will not be about an exception - that's why they're called "exceptions"), here's an extension to this consumer that provides two separate callbacks, one for nominal messages and another for invalid ones:

package com.mybiz.jms.activemq.server.requestreply.requestor.async;

import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.InvalidQueueRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;

import javax.jms.Message;
import javax.jms.MessageListener;

public class DualListener extends InvalidListener implements AbstractInvalidNotifier {

    public void notifyInvalidRequest(Message notification) {

        processReceived(notification, true);
    }

    protected ConnectionStuff setupInvalidConnection() throws Exception {

        MessageListener invalidQueueListener = new MessageListener() {
            public void onMessage(Message msg) {
                notifyInvalidRequest(msg);
            }
        };

        return new AsyncConsumerConnectionStuff(
                        RemoteService.url, InvalidQueueRemoteService.invalidQueueName,
                        invalidQueueListener, MSG_SELECTOR, true);
    }

    public static void main(String[] args) throws Exception {

        run(new DualListener());
    }

    private void processReceived(Message msg, boolean invalid) {

        if (invalid) {
            System.out.println(
                    "---->>> " + MessageUtil.getDateTime() + " Notified about INVALID response <<<----");
        }
        processReceived(msg);
    }
}

Running this modified requestor confirms that invalid messages are received in a separate method, precluding the need to parse replies just to catch the occasional exception:

Mar 25, 2010 1:33:29 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect       
INFO: Successfully connected to tcp://localhost:61616       
13:33:29.939 Request for service 'Async data request #1' has been sent...       
   Message ID: ID:localhost-34245-1269545609591-0:0:1:1:1       
   Correlation ID: null       
   Reply to:   queue://MyListener       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:29.959 Received reply 'Normal Service Reply'       
   Message ID: ID:localhost-34222-1269545245163-0:0:1:4:1       
   Correlation ID: ID:localhost-34245-1269545609591-0:0:1:1:1       
   Reply to:   null       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:32.951 Request for service 'Async data request #2' has been sent...       
   Message ID: ID:localhost-34245-1269545609591-0:0:1:1:2       
   Correlation ID: null       
   Reply to:   queue://MyListener       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:32.954 Received reply 'Normal Service Reply'       
   Message ID: ID:localhost-34222-1269545245163-0:0:1:5:1       
   Correlation ID: ID:localhost-34245-1269545609591-0:0:1:1:2       
   Reply to:   null       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:35.954 Request for service 'null' has been sent...       
   Message ID: ID:localhost-34245-1269545609591-0:0:1:1:3       
   Correlation ID: null       
   Reply to:   queue://MyListener       
   OID property:   13:33:29.348       
----------------------------------------------       
---->>> 13:33:35.957 Notified about INVALID response <<<----       
13:33:35.957 Received reply 'Exception occurred: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received'       
   Message ID: ID:localhost-34222-1269545245163-0:0:2:3:1       
   Correlation ID: ID:localhost-34245-1269545609591-0:0:1:1:3       
   Reply to:   null       
   OID property:   13:33:29.348       
----------------------------------------------

The caveat mentioned above remains, however: since this is a shared queue destination, any consumers that are not using the same message selector as this one can compete to consume the invalid messages, subverting the intended behavior. The "correct" remedy here is to simply follow the ActiveMQ recommendation, noted above.

Tuesday, March 23, 2010

Sync Request w/Async Invalid Channel

The first part of this series of writeups introduced a request-reply client that used the JMS 1.1 QueueRequestor to send the request, and a service that would throw an exception if the message was null or was not a TextMessage. The client will not know that the exception occurred, and since it is using the QueueRequestor, it can block "forever". Even without exceptions, blocking clients consume resources and can compromise the responsiveness of a system; while a trivial remedy to this would be to simply use an asynchronous listener, or to block-without-wait or block-with-timeout, in this case, again due to use of the QueueRequestor, a block-until-reply request is the only option. Before moving on to alternative client approaches, I'll first introduce a strategy that at least partially mitigates the problem with block-until-reply, and that can also be used with other request approaches as a mechanism for communicating exception information across the messaging middleware.

Here's an example class that blocks "forever":

package com.mybiz.jms.activemq.server.requestreply.requestor.sync.broken;

import com.mybiz.jms.activemq.server.requestreply.connection.SyncRequestorConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import javax.jms.JMSException;
import javax.jms.Message;

public class BlockForeverRequestor {

    public void runBlocking(SyncRequestorConnectionStuff stuff) throws JMSException {

        // create a null text message so as to provoke an exception. While the
        // exception will indeed get thrown, the client using this approach to request
        // something will never know it - it will block forever
        Message oopsieMyBad = stuff.getSession().createTextMessage();

        // this method calls use the QueueRequestor - a block-until-reply approach:
        MessageUtil.processRequestAndReply(stuff.getRequestor(), oopsieMyBad);

        // at this point, this client is going to block forever.
    }

    public static void main(String[] args) throws JMSException {

        BlockForeverRequestor requestor = new BlockForeverRequestor();
        SyncRequestorConnectionStuff serviceQueueStuff =
                new SyncRequestorConnectionStuff(RemoteService.url, RemoteService.serviceQueueName);
        requestor.runBlocking(serviceQueueStuff);
        serviceQueueStuff.getConnection().close();
    }
}

Continuing to use the plain-vanilla RemoteServer (introduced in Part 1) when running this client results in the following output:

Mar 23, 2010 10:26:11 AM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
10:26:11.514 Sending request for service 'null'...

And meanwhile, the service has issued this complaint, but the client will not be aware of it:

SEVERE: ID:localhost-52606-1269361388958-0:0:1:1 Exception while processing message: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received
java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received
   .....


One technique to remedy the client's ignorance of the exception is to listen on a separate queue where messages about exceptions, invalid messages, and etc. can be posted. A service that posts such messages might look like this:

package com.mybiz.jms.activemq.server.requestreply.replier;

import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ProducerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

// see Part 1 to examine the RemoteService class
public class InvalidQueueRemoteService extends RemoteService {

    public static final String invalidQueueName = "InvalidQueue";
    private ConnectionStuff invalidStuff;

    public InvalidQueueRemoteService() {}

    public InvalidQueueRemoteService(String theUrl, String serviceQueueName, String invalidQueueName)
            throws JMSException {

        super(theUrl, serviceQueueName);
        // use the same connection for the invalid queue as is used for the normal service replies.
        // Connections are expensive, but you can get multiple sessions out of them.
        invalidStuff = new ProducerConnectionStuff(getServiceQueueConnection(), invalidQueueName);
        System.out.println("   ...service will now send replies to an INVALID queue...");
    }

    private Connection getInvalidQueueConnection() {
        return invalidStuff.getConnection();
    }

    public void onMessage(Message requestMsg) {

        try {
            if (requestMsg instanceof TextMessage) {
                super.onMessage(requestMsg);
            } else {
                MessageUtil.examineReceivedRequest(requestMsg);
                // send a message back to an "invalid" queue. Any blocking requests
                // would still block forever. At least in this case, that requestor gets some indication
                // that something went wrong.
                sendReply(invalidStuff, requestMsg, "Replied to Invalid Queue",
                        invalidStuff.getDestination());
            }
        } catch (Exception e) {
            try {
                sendReply(invalidStuff, requestMsg, "Exception occurred: " + e,
                        invalidStuff.getDestination());
            } catch (JMSException e1) {
                throw new IllegalStateException("Multiple exceptions occurred; first " + e
                        + " -- and then " + e1);
            }
        }
    }

    protected void shutdown() throws Exception {
        super.shutdown();
        getInvalidQueueConnection().close();
    }

    public static void main(String[] args) throws Exception {
        RemoteService service =
                new InvalidQueueRemoteService(url, serviceQueueName, invalidQueueName);
        RemoteService.run(service);
    }
}


A base class that listens (asychronously) on this queue could look like this:

package com.mybiz.jms.activemq.server.requestreply.requestor.sync;

import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

public abstract class AbstractInvalidListener implements MessageListener {

    public void onMessage(Message msg) {

        try {
            System.out.println("Received message on INVALID queue!");
            MessageUtil.examineReceivedReply(msg);
            msg.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    protected ConnectionStuff setUpInvalidQueue(ConnectionStuff stuff, String theQueueName) throws JMSException {

        System.out.println("Setting up 'invalid queue' to listen for exceptions, etc...");
        ConnectionStuff invalidStuff = new AsyncConsumerConnectionStuff(stuff.getConnection(), theQueueName, this);
        // need to return a different "stuff" object here that reuses existing connection but encapsulates
        // new session and destination; client should use that stuff object for subsequent invalid-queue
        // messaging
        return invalidStuff;
    }

    public void testInvalidQueue(ConnectionStuff stuff) throws JMSException {

        TextMessage testMsg =
                stuff.getSession().createTextMessage("Testing invalid queue");
        testMsg.setText("Testing Invalid Queue");
        testMsg.setJMSReplyTo(stuff.getDestination());
        MessageUtil.examineRequestBeforeSend(testMsg);
       
        MessageProducer producer = stuff.getSession().createProducer(stuff.getDestination());
        producer.send(testMsg);
        MessageUtil.examineSentRequest(testMsg);
    }
}


A concrete subclass that uses this could be something like this:
package com.mybiz.jms.activemq.server.requestreply.requestor.sync.broken;

import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.SyncRequestorConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.InvalidQueueRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.requestor.sync.AbstractInvalidListener;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import javax.jms.JMSException;
import javax.jms.Message;

public class InvalidListener extends AbstractInvalidListener {

    public void runBlocking(SyncRequestorConnectionStuff stuff) throws JMSException {

        // create a null text message so as to provoke an exception.
        Message mistake = stuff.getSession().createTextMessage();
        MessageUtil.processRequestAndReply(stuff.getRequestor(), mistake);

        // Though this client now listens on the invalid queue, it is STILL going to block 
        // forever since the request still gets no reply - but at least this client gets 
        // notified since it's listening on the invalid queue. At that point, the client might 
        // be able to e.g. kill a separately-launched thread that is blocking, or otherwise 
        // deal with state as needed.

        // Can we provoke a reply to the blocked service request from that point?
        // That would be ideal, since that guarantees that the service reply comes AFTER the 
        // invalid reply. But that would involve knowing the name of the temporary queue that 
        // was set as the reply-to, but that isn't set until after the message is sent - which 
        // is too late - since it's a blocking request. Once the send is done, the requestor 
        // blocks for a reply. We need a better strategy.
    }

    public static void main(String[] args) throws JMSException {

        InvalidListener requestor = new InvalidListener();
        SyncRequestorConnectionStuff serviceQueueStuff =
                new SyncRequestorConnectionStuff(RemoteService.url, 
                RemoteService.serviceQueueName);
        ConnectionStuff invalidStuff =
                requestor.setUpInvalidQueue(serviceQueueStuff, 
                     InvalidQueueRemoteService.invalidQueueName);
        requestor.testInvalidQueue(invalidStuff);
        requestor.runBlocking(serviceQueueStuff);
        serviceQueueStuff.getConnection().close();
    }
}

Here's the console output when starting this new service:

Mar 23, 2010 10:59:57 AM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Service is waiting for a request...
   ...service will now send replies to an INVALID queue...

...and when running the requestor client:

Mar 23, 2010 11:00:28 AM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Setting up 'invalid queue' to listen for exceptions, etc...
11:00:29.104 Sending request for service 'Testing Invalid Queue'...
Received message on INVALID queue!
11:00:29.242 Request for service 'Testing Invalid Queue' has been sent...
    Message ID: ID:localhost-52894-1269363628763-0:0:2:1:1
    Correlation ID: null
    Reply to:   queue://InvalidQueue
----------------------------------------------
11:00:29.236 Received reply 'Testing Invalid Queue'
    Message ID: ID:localhost-52894-1269363628763-0:0:2:1:1
    Correlation ID: null
    Reply to:   queue://InvalidQueue
----------------------------------------------
11:00:29.243 Sending request for service 'null'...
Received message on INVALID queue!
11:00:29.275 Received reply 'Exception occurred: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received'
    Message ID: ID:localhost-52884-1269363597038-0:0:2:2:1
    Correlation ID: ID:localhost-52894-1269363628763-0:0:1:1:1
    Reply to:   null
----------------------------------------------

So, we've succeeded in at least notifying the client that its service request resulted in an exception; this is marginally helpful in the context of using a block-until-reply requestor. Note that the TopicRequestor is another JMS 1.1 concrete helper class, used again to facilitate a simple request-reply arrangement, but the API is the same - the request is blocking. Bottom line, we'd have better resilience - and be less coupled in either event, which is arguably one of the major benefits of a messaging approach - to use asynchronous messaging.

The next article will continue with use of the separate queue for invalid messaging, using asynchronous messaging for both that and for service requests, and evolve to ensure that invalid messages are delivered only to the appropriate requestor (since the invalid queue is a system-wide resource, shared by all requestor clients).

References

JMS Request/Reply, Part 1
JMS Home Page
JMS Downloads
JMS FAQ
JEE 5 JMS Tutorial
JEE 5 JMS Tutorial - Example Code
JEE 5 Online Javadoc
JMS 1.1 Online Javadoc
Apache ActiveMQ
Enterprise Integration Patterns - Messaging

Friday, March 19, 2010

JMS Synchronous Request/Reply

So far I've introduced some basics around JMS 1.1, including the Common Interface and Security/Concurrency. Continuing through the JMS 1.1 spec, my next drill-down will be around the Request/Reply idiom. Just so you know, these articles track my own learning curve around JMS, so you are invited to correct me and comment as needed.

Section 2.10 of the spec describes Request/Reply as an arrangement where a client sends a message to a remote service of some kind, expecting a reply. The request message header specifies a destination to which the service can (optionally!) respond with some information, or a confirmation that a requested action has been performed, or...etc. A well-behaved service not only replies, but the reply includes the ID of the request message (typically) so the requestor can correlate the reply with the previous request. JMS additionally provides basic helper implementations (QueueRequestor and TopicRequestor) that encapsulate some of the details involved here, including creating the temporary queue or temporary topic to which the reply is sent. In this series of posts, I'll trace my own prototyping, starting with the QueueRequestor (which is a synchronous messaging style) and evolving with some asynchronous alternatives, adding functionality in stages.

I'll use increasingly sophisticated levels of remote service, with various flavors of requestors associated with each. The remote services include one that simply listens on a request queue; another that does that plus notifies requestors about exceptions or invalid messages; and a third that does that plus uses message filtering on an asynchronous shared queue so that only the original requestor receives the reply. This post will describe the basic remote service and a basic requestor.

The remote service listens asynchronously on a system-wide queue and replies to requests at the reply-to destination specified in the JMSReplyTo header field. It is not terribly robust - it assumes the requestor will be sending non-null text messages; if a given message is either null or non-text, an exception is thrown. The service constructs a reply with the correlation ID (JMSCorrelationID header field) set to the request message ID (JMSMessageID header field), sends the reply and acknowledges the request.

This class provides a hook for subclasses to further decorate the message (e.g. setting properties, etc.), because we just happen to know we'll be using that in a later prototype extending this remote service.

The requestor connects to the service queue, sends a request using the QueueRequestor (which blocks on a dedicated temporary channel), and verifies that the reply has a correlation ID that matches the request message ID.

As an aside, the QueueRequestor and TopicRequestor classes are, interestingly, the only concrete classes in the JMS 1.1 distro - in other words, JMS providers need not implement anything here. Though, it wouldn't be surprising if provider-specific request/reply classes are available; it's depends on your needs as to whether proprietary mechanisms are appropriate, as always.

Here's the code for the service:

package com.mybiz.jms.activemq.server.requestreply.replier;

import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import org.apache.activemq.ActiveMQConnection;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

public class RemoteService implements MessageListener {

    // using an ActiveMQ provider
    public static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    public static final String serviceQueueName = "ServiceQueue";

    // ConnectionStuff is just a convenience composite object that 
    // encapsulates a destination, the session for that destination and 
    // the connection for that session.
    private ConnectionStuff serviceStuff;

    public RemoteService() {}

    public RemoteService(String theUrl, String serviceQueueName)
            throws JMSException {

        // establish the connection, session and destination, and start the connection
        serviceStuff = new AsyncConsumerConnectionStuff(theUrl, serviceQueueName, this);
        System.out.println("Service is waiting for a request...");
    }

    private boolean shutdown = false;
    protected void shutdown() throws Exception {

        // all done - release connection. No need to release any other objects,
        // that's done automatically when releasing the connection.
        getServiceQueueConnection().close();
    }

    protected Connection getServiceQueueConnection()
    {
        return serviceStuff.getConnection();
    }

    // assumes non-null TextMessage; sends a reply to the specified reply-to. 
    // Throws an exception if any problem occur.
    public void onMessage(Message requestMsg) {

        try {

            TextMessage requestTextMsg = (TextMessage) requestMsg;
            if (requestTextMsg.getText() == null) {
                   throw new IllegalStateException("NULL message received");
               }

            // MessageUtil is a convenience class that prints out 
            // information about messages, and provides various other conveniences
            MessageUtil.examineReceivedRequest(requestTextMsg);
            sendReply(serviceStuff, requestMsg, "Normal Service Reply",
                    requestMsg.getJMSReplyTo());

        } catch (Exception e) {

            throw new IllegalStateException(e);
        }
    }

    // Send the given reply to the given destination and acknowledge the given request message.
    protected void sendReply(ConnectionStuff stuff,
                             Message requestMessage, String reply,
                             Destination destination) throws JMSException {

        // construct and send the reply, with correlation ID set to message ID
        TextMessage replyMsg = stuff.getSession().createTextMessage();
        replyMsg.setJMSCorrelationID(requestMessage.getJMSMessageID());
        replyMsg.setText(reply);

        // allow subclasses to do extra stuff; default impl does nothing
        replyMsg = decorateMessage(requestMessage, replyMsg);

        MessageProducer producer = stuff.getSession().createProducer(destination);
        producer.send(replyMsg);
        MessageUtil.examineSentReply(replyMsg);

        requestMessage.acknowledge();
    }

    /**
     * Provides support for subclasses to further fill in the reply, e.g. 
     * by setting various properties. We just happen to know that this will 
     * be needed for the full demo - see MessageSelectorRemoteService.
     */
    protected TextMessage decorateMessage(Message requestMessage, 
            TextMessage replyMsg)
            throws JMSException {

        return replyMsg;
    }

    public static void main(String[] args) throws Exception {

        RemoteService service = new RemoteService(url, serviceQueueName);
        run(service);
    }

    /**
     * Runs the given remote service; subclasses should call this to run 
     * themselves - so that in case this demo is extended (which it will be), 
     * they won't need to duplicate the same execution code.
     */
    public static void run(RemoteService service) throws Exception {

        while (!service.shutdown) {
            // this service could support receiving a message that it reacts to by
            // setting shutdown to true; or you could set it manually in 
            // the debugger; or you could just not worry about it. This is 
            // just a demo.
            Thread.currentThread().sleep(1000);
        }
        service.shutdown();
        System.out.println("Done - exiting");
    }
}

And, here's the requestor code. The comments explain how to demo things (assuming the ActiveMQ provider is already running), and describes what we might think about to improve the functionality:

package com.mybiz.jms.activemq.server.requestreply.requestor.sync;

import com.mybiz.jms.activemq.server.requestreply.connection.SyncRequestorConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;

import javax.jms.JMSException;
import javax.jms.TextMessage;

public class VanillaSyncRequestor {

//    Demo:
//
//    Start the remote service, then run the requestor - console printouts 
//    reflect request and reply on both sides of messaging provider.
//
//    Now kill the remote service, then run the requestor - request is made, 
//    but no reply just yet. Start up the service "after a while", and the 
//    reply is received.
//
//    What does it need:
//
//    This is a simple request-reply demonstration. It will not know if 
//    the remote service throws an exception (which can happen, as noted 
//    above, if the request is either null or is not a text message). Though 
//    it has a dedicated temporary channel, it can possibly block for indefinite
//    periods of time - e.g. if the service goes down, it will block until that 
//    service is back online; worse, if the service throws an exception, it can 
//    block "forever". Blocking until the service comes back online can be 
//    considered a good thing, i.e. the request will "eventually" be handled -
//    and, it can be considered a bad thing, e.g. if many clients are blocking 
//    on a wait for an offline service, this consumes system resources.

    /**
     * Send a text message using the given connection stuff, wait synchronously 
     * for a reply.
     */
    public void sendRequest(SyncRequestorConnectionStuff stuff) throws JMSException {

        TextMessage requestMessage = stuff.getSession().createTextMessage(
                "Do this as soon as you're online!");

        // this convenience method sends the request, at which point the 
        // JMSMessageID is set in the header of that message; it acknowledges 
        // the reply and then confirms that the JMSCorrelationID in the 
        // reply matches the JMSMessageID, throwing an exception if that is 
        // not the case.
        MessageUtil.processRequestAndReply(stuff.getRequestor(), requestMessage);
    }

    public static void main(String[] args) throws JMSException {

        VanillaSyncRequestor requestor = new VanillaSyncRequestor();

        // get the connection, session and QueueRequestor - on return, 
        // the connection will have been started
        SyncRequestorConnectionStuff serviceQueueStuff =
                new SyncRequestorConnectionStuff(
                RemoteService.url, RemoteService.serviceQueueName);

        requestor.sendRequest(serviceQueueStuff);

        // requestor is done - release connection. No need to release any 
        // other objects, that's done automatically when releasing the connection.
        serviceQueueStuff.getConnection().close();
    }
}

I'm omitting the code for MessageUtil and ConnectionStuff - these can be regarded as black-boxes that "just work" as noted, for purposes of this article. But I will offer the printouts that occur when the suggested demo steps are taken:

1 - Start up the ActiveMQ provider (you'll of course need to download and install this first):

$ cd $ACTIVEMQ_HOME/apache-activemq-5.3.0/bin 
$ ./activemq
.........
(lots of noisy startup messages, omitted here...)
Loading message broker from: xbean:activemq.xml 
..........
INFO | Listening for connections at: tcp://localhost:61616 
.......... (etcetera etcetera)

2 - Start the service (I've done this in my IDE; just run the RemoteService class):

Mar 19, 2010 12:15:47 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Service is waiting for a request...

3 - Run VanillaSyncRequestor; observe that it sends a request and immediately gets a reply:

INFO: Successfully connected to tcp://localhost:61616
12:16:06.937 Sending request for service 'Do this as soon as you're online!'...
12:16:08.750 Request for service 'Do this as soon as you're online!' has been sent...
    Message ID: ID:080301h0114dl-3388-1269022561656-0:0:1:1:1
    Correlation ID: null
    Reply to:   temp-queue://ID:080301h0114dl-3388-1269022561656-0:0:1
----------------------------------------------
12:16:08.750 Received reply 'Normal Service Reply'
    Message ID: ID:080301h0114dl-3375-1269022545562-0:0:1:1:1
    Correlation ID: ID:080301h0114dl-3388-1269022561656-0:0:1:1:1
    Reply to:   null
----------------------------------------------

4 - Stop the service; run the requestor again - note that it sends the request but then appears to stop - in fact, it's blocking, waiting for the service to come back online:

INFO: Successfully connected to tcp://localhost:61616
12:30:07.875 Sending request for service 'Do this as soon as you're online!'...

5 - Start up the service - the printout there looks the same as above, since it grabs the message waiting in the queue and replies. At that point, thankfully, the requestor now completes, albeit "some time later", as can be seen by the timestamps:

12:31:57.937 Request for service 'Do this as soon as you're online!' has been sent...
    Message ID: ID:080301h0114dl-3906-1269023399953-0:0:1:1:1
    Correlation ID: null
    Reply to:   temp-queue://ID:080301h0114dl-3906-1269023399953-0:0:1
----------------------------------------------
12:31:57.937 Received reply 'Normal Service Reply'
    Message ID: ID:080301h0114dl-3964-1269023512671-0:0:1:1:1
    Correlation ID: ID:080301h0114dl-3906-1269023399953-0:0:1:1:1
    Reply to:   null
----------------------------------------------

As the comments in VanillaSyncRequestor indicate, there is room for improvement. Next, I'll demonstrate a client that blocks "forever", and take some steps to at least partially mitigate that. Then we'll move on to use asynchronous messaging in a request/reply approach.

References

JMS Home Page
JMS Downloads
JMS FAQ
JEE 5 JMS Tutorial
JEE 5 JMS Tutorial - Example Code
JEE 5 Online Javadoc
JMS 1.1 Online Javadoc
Apache ActiveMQ

Monday, March 15, 2010

Case Study: Add Functionality Without Changes to Code

What I'll describe here is an exercise I've completed that adds functionality to an existing codebase without changing that codebase. The product is not dynamically "pluggable", service-oriented, etc., so I don't have those levers to address the problem; instead, I've used Aspect Oriented Programming (AOP), JSF phase listeners, and the Observer design pattern to add to the codebase in a modular way. This gives me an easy way to remove that code if that's called for - and, for this exercise, that's the primary goal.


Our team's product cycle is transitioning with one release in test cycle, moving soon to a Beta audience; and with some new functionality targeted for the next release. These releases are being managed in separate SCM trees; however, due to various infrastructure limitations, we're being asked to wait on checking in changes to the test-cycle branch. But I don't want this to stop me from moving forward on that release.

There are various well-known problems, of course, if one makes "too many" changes to a  codebase before checking things in:
  1. There could be a conflict-resolution exercise between my changes and my teammates' changes to the same files.
  2. If a bug is found in Beta, that code will need a bugfix and a patch - but if I've intermingled new stuff in the code needing bugfixes, I have a potentially error-prone, tedious and time-consuming version control exercise.
There are SCM strategies to address #2; but for the sake of argument, let's just say that my constraint is to develop the new functionality with minimal - if not zero - impact to existing code, to avoid both potential problems from above.

For that matter, I wanted to challenge myself to see just how much I could do with this type of constraint - so I gave myself some "rules":
  1. keep the code changes as modular as possible so they can easily be added, for that matter easily removed if called for;
  2. ideally, no changes at all are made to existing code;
  3. if for some reason, I must change existing code, it must be done in such a way that new classes, declarations, etc. can be introduced, but existing code must not make reference to the new artifacts
So, e.g., I can use aspects, includable XML files, listeners of various flavors, and the like. Looking more closely at my "rules", it appears that #2 and #3 actually are the "how-to" supporting the "what" of #1 - i.e., the true goal here is to make it easy to add or remove the functionality if called for.

Here's the new functionality to be added:
  1. At various points during the customer's use of the product, a certain process should be launched to accomplish a particular goal. I'm intentionally being vague since the details don't really matter here.
  2. For each version of the product, that process should be executed just once without any confirmation from the user that it should proceed; but once it's been executed that first time (for a particular product release), the user should be issued some kind of popup confirmation dialog (this is a webapp UI) that offers a choice: allow the process to proceed with due caution, or just bail out.
  3. Every time a new product release is installed, the warning mechanism resets, i.e. the very next time the user launches the process, no such confirmation dialog appears.
The relevant pieces of the technology stack that I'll be "modifying" includes:
  1. A JSF-based web tier, using in particular the IceFaces component set;
  2. Spring 3.0
Breaking things down, it seems I'll need to do things like this:
  1. Persist some information around the product version, to be generated at build time and deployed with product installation;
  2. intercept the rendering of the UI button that launches the process so that the confirmation dialog can be associated with it, such that when the button is clicked, the confirmation will appear (giving the user a choice of proceeding or not) - but the confirmation appears only if the process has already executed for this product release;
  3. persist information after each process execution indicating this execution has occurred for a given product installations; and
  4. interpret the lack of that persisted information to indicate the process has not executed for this installation. This will be the initial condition, and the condition after each new product release is installed.

What I did to address #1 is trivial, and frankly out of scope for the purposes of this post. Likewise, #4 is reasonably straightforward; no additional details are needed around this.

To address #2 - intercept the rendering of the user action - my initial instinct tells me this sounds like an aspect...well, to be honest, I was looking for some excuses to use aspects - not only because I think this is a good approach to adding functionality in a modular way, but because I want to get better at AOP. A solution for #3 might also use an aspect, so that after the process has been run, the process-execution-history is updated.

As it turned out, the most interesting problem was intercepting the rendering of the UI button to attach a confirmation dialog dynamically. Note that sometimes the confirmation should be there - if the process has already been run - and sometimes, the process should just get launched without any warning to the user.

Let's first establish the confirmation dialog to be used. I'm using IceFaces, which provides a panelConfirmation tag that I use like this:

    <!--
    Warn user if process has already executed for this release
    -->
    <ice:panelConfirmation 
       id="warnProcessExecuted" 
       message="#{msgs['confirm.rerun']}"
       acceptLabel="Yes" cancelLabel="No"/>

The straightforward way to use this is to simply add it as an attribute to the existing IceFaces button that launches the process:

<ice:commandButton id="launchProcess"
     value="Launch Process"
     immediate="true"
     confirmationPanel="warnProcessExecuted"
     actionListener="#{eventsManager.startProcess}"/>

But this violates my self-imposed constraint described by rule #2 above: "ideally, no changes at all are made to existing code".  So my initial thought is to do this:
  1. intercept the handling of the action after user clicks on the button;
  2. find the JSF component associated with the button; and
  3. dynamically add the confirmationPanel attribute to that button component.
Doing things this way also would factor out the conditional part: if the process has not executed yet, there's no need to add the confirmation. But it turns out that trying to add the attribute during the action handling is too late - what I want is to intercept the rendering of the button. This sounds like I could use a JSF phase listener that listens for the beginning of the RENDER_RESPONSE phase to do this. This worked out just fine, but the thing about it that bothered me was that this happens several times for each action, and the method that I use to attach the attribute gets called each time. In this case, no harm is done; but I'd like to find a more general solution so my code execution is more deterministic and less at the mercy of the JSF lifecycle - bottom line, so that the method of interest is called only once at the beginning of the RENDER_RESPONSE phase. Here's how I did it:

First I create a reusable hierarchy of JSF phase listeners. The superclass provides the PhaseListener implementation, and additionally provides a registration mechanism - with which it remembers any interested listeners for a given phase. Each registered listener implements a PhaseObserver interface, and is notified at the beginning and end of each JSF phase:

public class PhaseMonitor implements PhaseListener {

    private static Map<JsfPhaseId, Set<PhaseObserver>> observers =
        new HashMap<JsfPhaseId, Set<PhaseObserver>>();

    // notify all registered observers for this phase that the phase has begun
    public void beforePhase(PhaseEvent event) {

        JsfPhaseId phaseId = JsfPhaseId.getJsfPhaseId(event.getPhaseId());
        Set<PhaseObserver> these = observers.get(phaseId);
        if (these != null) {
            for (PhaseObserver observer : these) {
                observer.notifyBeforePhase(phaseId);
            }
        }
    }

    // notify all registered observers for this phase that the phase has ended
    public void afterPhase(PhaseEvent event) {

        JsfPhaseId phaseId = JsfPhaseId.getJsfPhaseId(event.getPhaseId());
        Set<PhaseObserver> these = observers.get(phaseId);
        if (these != null) {
            for (PhaseObserver observer : these) {
                observer.notifyAfterPhase(phaseId);
            }
        }
    }

    // observer pattern: interested observers implement the PhaseObserver interface and register their interest
    public static void register(JsfPhaseId phase, PhaseObserver observer) {

        Set<PhaseObserver> these = observers.get(phase);
        if (these == null) {
            these = new HashSet<PhaseObserver>();
        }
        these.add(observer);
        observers.put(phase, these);
    }

    protected PhaseId getMyPhaseId() { // subclasses will override this for each JSF phase
        return null;
    }

    public PhaseId getPhaseId() {
        return getMyPhaseId();
    }
}


The JsfPhaseId is a first-class enum, provided not only so the observers can reference a simple enum constant (which the JSF PhaseID does not provide - it is not an enum (!!)), but so that client code need not be tightly coupled to JSF. Granted, clients will use the JsfPhaseId, which uses JSF, so the deployment will be coupled to JSF - but at least I've encapsulated my usage of JSF by providing this facade:

public enum JsfPhaseId
{
    APPLY_REQUEST_VALUES, INVOKE_APPLICATION, PROCESS_VALIDATIONS,
    RENDER_RESPONSE, UPDATE_MODEL_VALUES, RESTORE_VIEW, UNKNOWN;

    public static JsfPhaseId getJsfPhaseId(PhaseId phaseId) {

        if (phaseId.equals(PhaseId.APPLY_REQUEST_VALUES)) {
            return JsfPhaseId.APPLY_REQUEST_VALUES;
        } else if (phaseId.equals(PhaseId.INVOKE_APPLICATION))  {
            return JsfPhaseId.INVOKE_APPLICATION;
        } else if (phaseId.equals(PhaseId.PROCESS_VALIDATIONS)) {
            return JsfPhaseId.PROCESS_VALIDATIONS;
        } else if (phaseId.equals(PhaseId.RENDER_RESPONSE)) {
            return JsfPhaseId.RENDER_RESPONSE;
        } else if (phaseId.equals(PhaseId.UPDATE_MODEL_VALUES)) {
            return JsfPhaseId.UPDATE_MODEL_VALUES;
        } else if (phaseId.equals(PhaseId.RESTORE_VIEW)) {
            return JsfPhaseId.RESTORE_VIEW;
        }
        return JsfPhaseId.UNKNOWN;
    }
}

Each phase-specific subclass of PhaseMonitor does this:

public class RenderResponsePhaseMonitor extends PhaseMonitor {

    private PhaseId phaseId = PhaseId.RENDER_RESPONSE;
    protected PhaseId getMyPhaseId() {
        return phaseId;
    }
}

This is repeated for the other JSF phases. Now I need to add these phase listeners to the code, but in a way that minimizes impact that existing code. So, I do not want to modify my existing JSF configuration file by declaring these listeners; instead, I add a new config file using the web-tier deployment descriptor (web.xml):

<context-param>
        <param-name>javax.faces.CONFIG_FILES</param-name>
        <param-value>/WEB-INF/faces-config-application.xml, /WEB-INF/faces-config-listeners.xml</param-value>
</context-param>

That config file looks like this:

<faces-config xmlns="http://java.sun.com/JSF/Configuration">
    <!-- listen for phase events to facilitate phase listening, notification -->
    <lifecycle>
        <phase-listener>com.mybiz.web.jsf.lifecycle.ApplyRequestValuesPhaseMonitor</phase-listener>
        <phase-listener>com.mybiz.web.jsf.lifecycle.InvokeApplicationPhaseMonitor</phase-listener>
        <phase-listener>com.mybiz.web.jsf.lifecycle.ProcessValidationsPhaseMonitor</phase-listener>
        <phase-listener>com.mybiz.web.jsf.lifecycle.RenderResponsePhaseMonitor</phase-listener>
        <phase-listener>com.mybiz.web.jsf.lifecycle.RestoreViewPhaseMonitor</phase-listener>
        <phase-listener>com.mybiz.web.jsf.lifecycle.UpdateModelValuesPhaseMonitor</phase-listener>
    </lifecycle>
</faces-config>

The interested observers implement this interface:

public interface PhaseObserver {

    public void notifyBeforePhase(JsfPhaseId phaseId);
    public void notifyAfterPhase(JsfPhaseId phaseId);
}

With these levers in place, I can now implement an observer that will be notified once and only once at the beginning of the RENDER_RESPONSE phase, at which time it will determine if a confirmation for the user is needed; if so, it will find the JSF component of interest and attach the confirmation panel, and, if the user elects to proceed, updating some persistent history "somewhere" with information about this execution; else it sets the attribute value for the confirmation panel to an empty string so that no such dialog pops up. Since I don't want to minimize my changes to existing code, I add this functionality with an aspect:

@Aspect
public class ExecutionAspects implements PhaseObserver {

    public ExecutionAspects() {
        PhaseMonitor.register(JsfPhaseId.RENDER_RESPONSE, this);
    }

    private ExecutionHelper executionHelper;  // injected via Spring
    public void setExecutionHelper(ExecutionHelper theHelper) {
        executionHelper = theHelper;
    }

    /**
     * Establish a pointcut that describes a method which we know will be called when the button is rendered
     */
    @Pointcut("execution(* com.mybiz.view.ViewHelper.isLaunchProcessButtonShowing(..))")
    public void isLaunchProcessButtonShowing() {
    }

    // here's how we enforce the "once and only once" constraint:
    private boolean needConfirmationForThisRequest = true;
    public void notifyBeforePhase(JsfPhaseId phaseid) {
        needConfirmationForThisRequest = true;
    }
    public void notifyAfterPhase(JsfPhaseId phaseid)
    {
        needConfirmationForThisRequest = false;
    }

    /**
     * Intercept rendering of button to detect whether or not the process has already been run for this release
     */
    @Around("isLaunchProcessButtonShowing()")
    private boolean interceptRender(ProceedingJoinPoint pjp) throws Throwable {
        if (needConfirmationForThisRequest)
        {
            // get UI component, add confirmation panel if needed
            HtmlCommandButton button =
                    (HtmlCommandButton) FacesUtils.findComponent(FacesUtils.getFacesContext().getViewRoot(),
                            "launchProcess");  // search from root of JSF component tree for the launch button ID
            if (executionHelper.getProcessHasExecutedForThisProduct()) {
                // attach the confirmation
                button.setPanelConfirmation("warnProcessExecuted");
            } else {
                // set the confirmation attribute to an empty string so no warning will appear
                button.setPanelConfirmation("");
            }
            needConfirmationForThisRequest = false;
        }
        // return value as normal
        return (Boolean)pjp.proceed();
    }

    /**
     * Establish a pointcut that intercepts the user action of launching the process
     */
    @Pointcut("execution(* com.mybiz.view.EventsHelper.startProcess(..))")
    public void startProcess()
    {
    }

    /**
     * Intercept launching of process to facilitate updating the execution history after it's done
     */
    @Around("startProcess()")
    private void interceptProcess(ProceedingJoinPoint pjp) throws Throwable
    {
        pjp.proceed();
        // update history so user can be warned that it's already been run (next time it's requested)
        executionHelper.updateConfMergeHistory();
    }
}

Now the aspect needs to be added to runtime execution; I already have a Spring context file for the existing product functionality, but again I don't want to change that file. Instead, I modify the web-tier deployment descriptor to add a new one:

<context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>/WEB-INF/beans-all.xml, /WEB-INF/beans-execution.xml</param-value>
</context-param>

That Spring context file manages the ExecutionAspects class, supplying the ExecutionHelper dependency:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/aop
       http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
       ">

    <!--
    Configuration file used to maintain execution information - i.e. as a control for when the process
    is run with or without warning, we need to maintain information about what release is this
    product, and compare that to what release was the last execution run against.
    -->
    <bean id="executionHelper" scope="session" class="com.mybiz.aspects.ExecutionHelper">
    </bean>

    <aop:aspectj-autoproxy proxy-target-class="false"/>

    <bean id="execution-aspect" class="com.mybiz.aspects.ExecutionAspects">
        <property name="executionHelper" ref="executionHelper"/>
    </bean>

</beans>

The ExecutionHelper is of minimal interest here; it simply does CRUD on "some persistence mechanism" to manage information about what is the current product release and what particular release has the process last been executed against. It provides a convenience method (getProcessHasExecutedForThisProduct(), as seen above) encapsulating all of that information with a boolean indicating exactly what it's name suggests.


So let's revisit my initial goals and see how well I've done:
  1. keep the code changes as modular as possible so they can easily be removed if called for;
  2. ideally, no changes at all are made to existing code;
  3. if for some reason, I must change existing code, it must be done in such a way that new classes, declarations, etc. can be introduced, but existing code must not make reference to the new artifacts (this facilitates rule #1)
#1: are my changes modular? In the sense that they can be removed very easily if needed, yes - here's what I'd need to do to remove this functionality: change this -->

<param-value>/WEB-INF/beans-all.xml, /WEB-INF/beans-execution.xml</param-value>

to this -->

<param-value>/WEB-INF/beans-all.xml</param-value>nclude src="/WEB-INF/includes/panel-confirmation.jspx"/>

Without the single additional declaration of Spring context that attaches the new aspect to the runtime, the aspect will not get instantiated, let alone executed. All of the existing code - well, most of it anyway - remains untouched. The new code can be left as an addition to the codebase, ready for activation whenever product management calls for that.

You'll notice I said most of the existing code is unchanged. Beyond the addition of the 2nd Spring context, as noted here, and the additional JSF Config file that declares the hierarchy of phase listeners, I did need to make one change to the JSF file that declares the command button; I needed to include the new JSF file that declares the confirmation within the same form. Apparently this is a constraint of the component set, and it is not surprising. So I ended up doing this:


<ice:commandButton id="launchProcess"
    value="Launch Process"
     immediate="true"
    actionListener="#{eventsManager.startProcess}"/>
             
 <!-- confirmation panel must be in same form as the button that references it. The
  "launchProcess" button, above, is managed dynamically to point to this confirmation
  in an aspect. For better modularity, include the panelConfirmation snippet:
  -->
  <ui:include src="/WEB-INF/includes/panel-confirmation.jspx"/>

So I didn't quite succeed with rule #2 -- I did have to make some changes to existing code. But for all intents and purposes, it's not an issue -- since the change to the code is a declaration that is not referenced by existing code. I'd argue likewise for the addition of the JSF Config file with the hierarchy of phase listeners and built-in observer mechanism - this is, in my opinion, a worthwhile addition to any JSF application, one that I'll likely continue doing from this point forward. I just happened to leverage it with an aspect that registers as an interested observer of the RENDER_RESPONSE phase. That is the essence of modularity.

Again, not quite successful with rule #3 - since the changes to the web.xml do make reference to the new JSF Config and Spring context files. But again, in spirit that goal was only in place to facilitate success on rule #1 - and bottom line, I can add or remove this new functionality without breaking a sweat.

Friday, March 12, 2010

JMS Security, Concurrency and Triggers

This is the 2nd in a series of posts around JMS 1.1, focusing on things that can guide design decisions when putting a JMS application together. In this writeup, I'll discuss security, concurrency and the use of triggers.


Security

JMS 1.1, section 2.7: JMS does not provide features for controlling or configuring message integrity or message privacy. It is expected that many JMS providers will provide such features. It is also expected that configuration of these services will be handled by provider specific administration tools. Clients will get the proper security configuration as part of the administered objects they use.

Decision: does the application need authentication, confidentiality, etc.?

Idioms: Any client authentication is handled by the Connection object. A JMSSecurityException is thrown when authentication credentials are rejected by the provider, or whenever any security restriction prevents a method from completing.

Caveat: Any security measures applied will not be portable across JMS providers. Remember that authentication and confidentiality exercises are relatively heavyweight; combined with the heavy lifting of setting up network connectivity, this motivates minimizing the number of connections in use (and, for that matter, of using connection caches).


Concurrency

Of the six top-level JMS objects, only Destination, ConnectionFactory and Connection are intended for multi-threaded access.  The Session, MessageProducer and MessageConsumer are intended for single-thread use only. See the JMS 1.1 spec, section 2.8 for an in-depth rationale around this restriction - bottom line, this makes it easier for the typical JMS client, with concurrency possible if needed by using multiple sessions.

JMS providers must prevent concurrent access to a given client's state that could result in messages being lost or processed redundantly - whether that's done by an exception being thrown, or blocking the offending client, or otherwise.

Idioms: Note that this doesn't mean multiple threads can't use a given Session object - it just means the developer must ensure the access is not concurrent. Using one thread only is the simplest way to ensure this; otherwise, explicit synchronization is called for. The standard approach to setting up asynchronous delivery is using a single thread for setup while the Connection is stopped, then use that same thread to start the Connection. Concurrent access to a session's producers and consumers is also not allowed. If a client uses one thread to produce messages and other threads to consume them, use of a separate session for the producing thread is called for. Once a connection has been started, do not use any Session method except close; using a separate thread of control to close a Session is allowed. This restriction applies in particular to setting up multiple message listeners - all of these must be established before the connection is started.

Caveat: While a client may have multiple sessions, JMS does not define the behavior around concurrent QueueReceivers for the same Queue; relying on a given provider's support for this is not portable. Note that the Session serializes execution of asynchronous deliveries, using a single thread to run all MessageListeners.  One consequence of this is that a session with asyncronous listeners cannot be used to also receive messages synchronously.

Recommendation: Consider the effect of the session's serial execution on your target throughput. For higher throughput via concurrency, use multiple sessions. However, note that it is not considered reliable to use a single consumer with application-level multi-threading logic to concurrently process messages from a topic (due to lack of adequate transaction facility in JMS). JMS does, however, provide a special facility for creating MessageConsumers that can consume messages concurrently, via application server support; see section 8 in the spec. The application developer presents a single-threaded program if using this facility.


Triggers

A trigger is e.g. a threshold of waiting messages, a length of time that has gone by, a time of day, etc., which is used to wake up a client so it will process any waiting messages. Keep in mind that any such mechanism, if available in the provider you use, is not specified by the JMS spec, and as such is not portable.

Caveat: any trigger mechanisms will not be portable across JMS providers.


In the next several posts in this series, I'll discuss things like request/reply, message ID, timestamp, message expiration and message priority.

JMS Common vs Domain-Specific Interfaces

Reading through the JMS 1.1. spec, I'm motivated to in fact re-read it, since certain concepts are referenced before being introduced. This isn't unusual for a spec, and that's not a critique; but bottom line, I decided to go through the spec a second time, this time collating related information around given concepts in one place, and ordering things in a way that doesn't assume prior knowledge.

This is the first in a series of posts in which I'll focus on concepts and facilities from the spec that call for some kind of design decision (as opposed to more general behaviorial issues, etc.) - since I'm in the process of making those kinds of decisions with my current development efforts.

I'll start with some introductory basics around use of JMS; listing decisions to be made with guidance around each, plus recommended practices, programming idioms and caveats. First I address only the use of "JMS Common" vs "Domain-Specific" interfaces, since that writeup is long enough for its own post.



JMS supports both queue-based (aka Point-to-Point, or PTP) and topic-based (aka Publish/Subscribe, or Pub/Sub) models (aka "domains"). As of JMS 1.1, so-called "common interfaces" are available that encapsulate this distinction (aka "unification of messaging domains") - while the legacy domain-specific APIs are preserved for backwards compatibillity.

As per JMS 1.1, section 2.5: The JMS common interfaces provide a domain-independent view of the PTP and Pub/Sub messaging domains. JMS client programmers are encouraged to use these interfaces to create their client programs.

Here's a table illustrating the difference:


JMS Common Interfaces
PTP-specific
Pub/Sub-specific
ConnectionFactory
QueueConnectionFactory
TopicConnectionFactory
Connection
QueueConnection
TopicConnection
Destination
Queue Topic
Session
QueueSession TopicSession
MessageProducer
QueueSender TopicPublisher
MessageConsumer
QueueReceiver, QueueBrowser
TopicSubscriber

And here are some definitions:

ConnectionFactory - an administered object used by a client to create a Connection
Connection - an active connection to a JMS provider
Destination - an administered object that encapsulates the identity of a message destination
Session - a single-threaded context for sending and receiving messages
MessageProducer - an object created by a Session that is used for sending messages to a destination
MessageConsumer - an object created by a Session that is used for receiving messages sent to a destination

Decision: which one to use?

Recommendation: use the JMS common interface, in particular if you wish to enclose send/receive of messages from both domains (i.e. queue and topic) within a single transaction. From section 11.4.1: (use of JMS Common API) simplifies the client programming model, so that the client programmer can use a simplified set of APIs to create an application...using (JMS Common) methods, a JMS client can create a transacted Session, and then receive messages from a Queue and send messages to a Topic within the same transaction. There are additional benefits to providers in terms of opportunities for certain optimizations in their implementations.

Caveat: Be aware that in future JMS releases, the domain-specific APIs may be deprecated. Keep in mind that PTP and Pub/Sub messaging system behaviors will of course be different, even though you're using the same API, since the semantics of each domain are different. An unpleasant side-effect of this is the fact that, since the common interface defines e.g. some queue-specific methods - and since the topic-specific classes inherit from that interface - there are some methods available that just aren't appropriate. If the application calls any of these methods, an IllegalStateException is thrown. Why this isn't an OperationUnsupportedException is another question.

Here is the list of those methods:

Interface Method
QueueConnection createDurableConnectionConsumer
QueueSession createDurableSubscriber
createTemporaryTopic
createTopic
unsubscribe
TopicSession createQueueBrowser
createQueue
createTemporaryQueue

Note that there are also JMS Common Interfaces available for JTS services, as described in section 8.6 of the spec. However, be aware that JMS providers are not required to support JTS, so use of this is not portable across providers.



In the next post, I'll touch on more basics, to include security and concurrency.

Thursday, March 4, 2010

TestNG Diagnostics

Just a quick note about getting diagnostics from unit tests - if using TestNG, you can configure the dump of detailed test information into the command shell from which you run your tests. Use the verbose property, which you can access via a TestNG object or as part of the testng ant task. This will provide useful insights around failures (stack trace, etc.). For example:

<property environment="env"/>
<testng classpathref="my.classpath"
            outputDir="${my.outputdir}"
            haltOnfailure="true"
            verbose="${env.verbose}">

Here I've used an environment variable to facilitate dynamic levels of verbosity. As per the TestNG Javadoc:

verbose - the verbosity level (0 to 10 where 10 is most detailed) Actually, this is a lie: you can specify -1 and this will put TestNG in debug mode (no longer slicing off stack traces and all).

Given this, I can run my tests like so:

% export verbose=5
% cd /home/mystuff/java/projects/quantum-gravity-dongle/tests
% ant runtests

...and I'll get the diagnostics I need to quickly locate test problems.