Thursday, March 25, 2010

Asynchronous JMS Request/Reply

My recent previous posts have described a JMS-based request/reply client that uses blocking request semantics, and has no way of knowing if the service has thrown an exception and as such will never reply; and another that is marginally better, which listens on an invalid channel for messages about exceptions. That slight improvement still leaves a fair amount to be desired.

To clarify the goal of this exercise, I'd like the following:
  1. JMS request-reply arrangement, i.e. issue a request to some service for some information or for a task to be performed, and receive a reply (either with the information or a status/confirmation about the task)
  2. Receive information about an invalid request, or an exception that happened on the service side, in its attempt to fulfill the request
  3. No blocking request is left hanging in the face of a service-side exception
So, to address #3, we already know we could use block-without-wait or block-with-timeout; or we can use an asynchronous listener. Using the different flavors of blocking request means using one's own MessageConsumer instead of the built-in QueueRequestor (which is what I used in the first two parts of this series). From here, however, I'd prefer to just move on to an asynchronous idiom - but before leaving, check this article for some additional insights around synchronous request-reply.

Now if we use our own asynchronous consumer, we no longer have the automatic temporary queue construction done via the QueueRequestor. We'll construct our own invalid queue, but we do not want one for each potential requestor - construction of these is expensive. To be more precise, I'll quote the ActiveMQ recommendation:

The best way to implement request-response over JMS is to create a temporary queue and consumer per client on startup, set JMSReplyTo property on each message to the temporary queue and then use a correlationID on each message to correlate request messages to response messages. This avoids the overhead of creating and closing a consumer for each request (which is expensive). It also means you can share the same producer & consumer across many threads if you want (or pool them maybe).

This example will not do exactly that; instead, for simplicity (and as an excuse to demonstrate message selection), I'll construct one shared invalid queue - but this presents the problem of how to return invalid information only to the requestor that should hear about it (i.e., if there are multiple queue-based clients, then only the first one to receive from a given queue will get that message). To address this problem, we can use message selection - but keep in mind, this relies on all requestors in a given system to be using the same filter. If for example one rogue client did no filtering with a message selector, it would be competing with the intended receiver for a message about exceptions - and if it got there first, the intended receiver would never know about it.

We've already seen a remote service that sends messages about exceptions to an invalid queue; now we want to extend that to additionally set a property in the reply message that facilitates the intended message selection. Here's an example of that, where the service agrees to set a unique ID as a property in the reply message (see the previous posts for details around the RemoteService, InvalidQueueRemoteService, etc.):

package com.mybiz.jms.activemq.server.requestreply.replier;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

public class MessageSelectorRemoteService extends InvalidQueueRemoteService {

    public static final String OID_PROPERTY = "oid";

    public MessageSelectorRemoteService(String theUrl, String serviceQueueName, String invalidQueueName)
            throws JMSException {

        super(theUrl, serviceQueueName, invalidQueueName);
        System.out.println("   ...service will now add OID property to reply...");
    }

    protected TextMessage decorateMessage(Message requestMessage, TextMessage replyMsg)
            throws JMSException {

        // include the OID_PROPERTY property
        String oid = requestMessage.getStringProperty(OID_PROPERTY);
        replyMsg.setStringProperty(OID_PROPERTY, oid);
        return replyMsg;
    }

    public static void main(String[] args) throws Exception {

        RemoteService service = 
                new MessageSelectorRemoteService(url, serviceQueueName, invalidQueueName);
        RemoteService.run(service);

    }
}

One consumer using this service could set up a single MessageListener that receives both nominal and exceptional messages:

package com.mybiz.jms.activemq.server.requestreply.requestor.async;

import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ProducerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.InvalidQueueRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.MessageSelectorRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

// you'll see lots of protected access in fields and methods here - that's because I just happen
// to know that I'll be extending this class, and protected qualifiers are "good enough" for
// demo purposes:
public class InvalidListener implements MessageListener {

    // add message selector so queue listeners only gets their own messages,
    // and so that no other clients receive those messages instead. This particular
    // OID can fail if two clients are started at exactly the same millisecond:
    protected static final String OID = MessageUtil.getDateTime();
    protected static final String MSG_SELECTOR =
            MessageSelectorRemoteService.OID_PROPERTY + "='" + OID + "'";

    // keep track of number of message sent and received; if the gap between these grows
    // too much, log a message
    protected static int numSent = 0, numReceived = 0;
    protected static final int maxGap = 6;

    // protect numSent and numReceived during concurrent access
    protected static final Object mutex = new Object();

    // this client uses message selectors so it receives only those messages intended
    // for it on the invalid queue, ftm no other clients will receive those messages. With
    // message selectors, the filtering is done by the provider so it's more efficient than
    // asking each client to inspect each message for those that apply to it alone.

    public void onMessage(Message msg) {

        processReceived(msg);
    }

    protected ConnectionStuff setupInvalidConnection() throws Exception {

        // miracle occurs here: this sets the message listener to this object, so both
        // nominal and invalid messages will now arrive in the onMessage method:
        return new AsyncConsumerConnectionStuff(
                RemoteService.url, InvalidQueueRemoteService.invalidQueueName,
                this, MSG_SELECTOR, true);
    }

    protected static void run(InvalidListener requestor) throws Exception {

        boolean shutdown = false;

        // set up producer that sends requests to the service queue. Messages to the queue should
        // have set the replyTo value to a temporary queue created manually.
        ProducerConnectionStuff requestorConnection =
                new ProducerConnectionStuff(RemoteService.url, RemoteService.serviceQueueName, true);

        // set up consumer that receives asynchronously at the temporary queue and start the connection
        ConnectionStuff consumerConnection =
                new AsyncConsumerConnectionStuff(
                        RemoteService.url, "MyListener", requestor,
                        requestor.MSG_SELECTOR, true);

        // factored out so it can be overridden, which will happen in next example requestor:
        ConnectionStuff invalidConnection = requestor.setupInvalidConnection();

        // send a message - async handler should get the reply
        int msgNum = 0;
        while (!shutdown) {
            TextMessage requestMsg = requestorConnection.getSession().createTextMessage();
            if ((++msgNum % 3) == 0) {
                // every 3rd message is NULL, which we know will provoke an invalid queue message
                requestMsg.setText(null);
            } else {
                requestMsg.setText("Async data request #" + msgNum);
            }

            // remote service agrees to set property in invalid replies matching any given
            // with RemoteService.OID_PROPERTY name
            requestMsg.setStringProperty(MessageSelectorRemoteService.OID_PROPERTY, requestor.OID);
            requestorConnection.send(requestMsg, consumerConnection.getDestination());

            requestor.processSent(requestMsg);
        }

        requestorConnection.getConnection().close();
        consumerConnection.getConnection().close();
        invalidConnection.getConnection().close();
    }

    public static void main(String[] args) throws Exception {

        run(new InvalidListener());
    }

    private void processSent(Message requestMsg) throws Exception {

        synchronized (mutex) {
            numSent++;
        }

        MessageUtil.examineSentRequest(requestMsg);
        Thread.currentThread().sleep(3000);

        // confirms the number of messages received vs sent is within a specified threshold. If
        // the remote service goes down, this message will log continuously; but within a finite
        // amount of time (depending on how long the service was down), the gap will be made up
        // (from empirical observations)
        if (numSent - numReceived > maxGap) {
            System.err.println("WARNING: # sent = " + numSent
                    + ", # received = " + numReceived);
        }
    }

    protected void processReceived(Message msg) {

        synchronized (mutex) {
            ++numReceived;
        }
        // confirms the message received has an OID property value as expected
        MessageUtil.process(msg, MessageSelectorRemoteService.OID_PROPERTY, OID);
    }
}

This requestor also monitors the gap between the number of messages sent vs the number received - if that starts getting too large, it will notice; this could indicate some kind of trouble around receiving the expected replies. As hoped, it now receives messages about exceptions, etc. - here's the startup of the service, and then the requestor:

Mar 25, 2010 1:27:25 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
Service is waiting for a request...
INFO: Successfully connected to tcp://localhost:61616
   ...service will now send replies to an INVALID queue...
   ...service will now add OID property to reply...

Mar 25, 2010 1:28:42 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
13:28:42.779 Request for service 'Async data request #1' has been sent...
    Message ID: ID:localhost-34234-1269545322429-0:0:1:1:1
    Correlation ID: null
    Reply to:   queue://MyListener
    OID property:   13:28:42.217
----------------------------------------------
13:28:42.796 Received reply 'Normal Service Reply'
    Message ID: ID:localhost-34222-1269545245163-0:0:1:2:1
    Correlation ID: ID:localhost-34234-1269545322429-0:0:1:1:1
    Reply to:   null
    OID property:   13:28:42.217
----------------------------------------------
13:28:45.792 Request for service 'Async data request #2' has been sent...
    Message ID: ID:localhost-34234-1269545322429-0:0:1:1:2
    Correlation ID: null
    Reply to:   queue://MyListener
    OID property:   13:28:42.217
----------------------------------------------
13:28:45.819 Received reply 'Normal Service Reply'
    Message ID: ID:localhost-34222-1269545245163-0:0:1:3:1
    Correlation ID: ID:localhost-34234-1269545322429-0:0:1:1:2
    Reply to:   null
    OID property:   13:28:42.217
----------------------------------------------
13:28:48.798 Request for service 'null' has been sent...
    Message ID: ID:localhost-34234-1269545322429-0:0:1:1:3
    Correlation ID: null
    Reply to:   queue://MyListener
    OID property:   13:28:42.217
----------------------------------------------
13:28:48.815 Received reply 'Exception occurred: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received'
    Message ID: ID:localhost-34222-1269545245163-0:0:2:2:1
    Correlation ID: ID:localhost-34234-1269545322429-0:0:1:1:3
    Reply to:   null
    OID property:   13:28:42.217
----------------------------------------------

As expected from the requestor logic, every 3rd message results in an exception, and the requestor is notified about this. So we've solved the problem around a request blocking, potentially "forever", if the service throws an exception; however, this particular flavor of consumer must examine each message received to determine whether or not it indicates such a problem. If you'd like to avoid putting that burden on every requestor (since most of the time, the message will not be about an exception - that's why they're called "exceptions"), here's an extension to this consumer that provides two separate callbacks, one for nominal messages and another for invalid ones:

package com.mybiz.jms.activemq.server.requestreply.requestor.async;

import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.InvalidQueueRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;

import javax.jms.Message;
import javax.jms.MessageListener;

public class DualListener extends InvalidListener implements AbstractInvalidNotifier {

    public void notifyInvalidRequest(Message notification) {

        processReceived(notification, true);
    }

    protected ConnectionStuff setupInvalidConnection() throws Exception {

        MessageListener invalidQueueListener = new MessageListener() {
            public void onMessage(Message msg) {
                notifyInvalidRequest(msg);
            }
        };

        return new AsyncConsumerConnectionStuff(
                        RemoteService.url, InvalidQueueRemoteService.invalidQueueName,
                        invalidQueueListener, MSG_SELECTOR, true);
    }

    public static void main(String[] args) throws Exception {

        run(new DualListener());
    }

    private void processReceived(Message msg, boolean invalid) {

        if (invalid) {
            System.out.println(
                    "---->>> " + MessageUtil.getDateTime() + " Notified about INVALID response <<<----");
        }
        processReceived(msg);
    }
}

Running this modified requestor confirms that invalid messages are received in a separate method, precluding the need to parse replies just to catch the occasional exception:

Mar 25, 2010 1:33:29 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect       
INFO: Successfully connected to tcp://localhost:61616       
13:33:29.939 Request for service 'Async data request #1' has been sent...       
   Message ID: ID:localhost-34245-1269545609591-0:0:1:1:1       
   Correlation ID: null       
   Reply to:   queue://MyListener       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:29.959 Received reply 'Normal Service Reply'       
   Message ID: ID:localhost-34222-1269545245163-0:0:1:4:1       
   Correlation ID: ID:localhost-34245-1269545609591-0:0:1:1:1       
   Reply to:   null       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:32.951 Request for service 'Async data request #2' has been sent...       
   Message ID: ID:localhost-34245-1269545609591-0:0:1:1:2       
   Correlation ID: null       
   Reply to:   queue://MyListener       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:32.954 Received reply 'Normal Service Reply'       
   Message ID: ID:localhost-34222-1269545245163-0:0:1:5:1       
   Correlation ID: ID:localhost-34245-1269545609591-0:0:1:1:2       
   Reply to:   null       
   OID property:   13:33:29.348       
----------------------------------------------       
13:33:35.954 Request for service 'null' has been sent...       
   Message ID: ID:localhost-34245-1269545609591-0:0:1:1:3       
   Correlation ID: null       
   Reply to:   queue://MyListener       
   OID property:   13:33:29.348       
----------------------------------------------       
---->>> 13:33:35.957 Notified about INVALID response <<<----       
13:33:35.957 Received reply 'Exception occurred: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received'       
   Message ID: ID:localhost-34222-1269545245163-0:0:2:3:1       
   Correlation ID: ID:localhost-34245-1269545609591-0:0:1:1:3       
   Reply to:   null       
   OID property:   13:33:29.348       
----------------------------------------------

The caveat mentioned above remains, however: since this is a shared queue destination, any consumers that are not using the same message selector as this one can compete to consume the invalid messages, subverting the intended behavior. The "correct" remedy here is to simply follow the ActiveMQ recommendation, noted above.

No comments:

Post a Comment