Tuesday, March 23, 2010

Sync Request w/Async Invalid Channel

The first part of this series of writeups introduced a request-reply client that used the JMS 1.1 QueueRequestor to send the request, and a service that would throw an exception if the message was null or was not a TextMessage. The client will not know that the exception occurred, and since it is using the QueueRequestor, it can block "forever". Even without exceptions, blocking clients consume resources and can compromise the responsiveness of a system; while a trivial remedy to this would be to simply use an asynchronous listener, or to block-without-wait or block-with-timeout, in this case, again due to use of the QueueRequestor, a block-until-reply request is the only option. Before moving on to alternative client approaches, I'll first introduce a strategy that at least partially mitigates the problem with block-until-reply, and that can also be used with other request approaches as a mechanism for communicating exception information across the messaging middleware.

Here's an example class that blocks "forever":

package com.mybiz.jms.activemq.server.requestreply.requestor.sync.broken;

import com.mybiz.jms.activemq.server.requestreply.connection.SyncRequestorConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import javax.jms.JMSException;
import javax.jms.Message;

public class BlockForeverRequestor {

    public void runBlocking(SyncRequestorConnectionStuff stuff) throws JMSException {

        // create a null text message so as to provoke an exception. While the
        // exception will indeed get thrown, the client using this approach to request
        // something will never know it - it will block forever
        Message oopsieMyBad = stuff.getSession().createTextMessage();

        // this method calls use the QueueRequestor - a block-until-reply approach:
        MessageUtil.processRequestAndReply(stuff.getRequestor(), oopsieMyBad);

        // at this point, this client is going to block forever.
    }

    public static void main(String[] args) throws JMSException {

        BlockForeverRequestor requestor = new BlockForeverRequestor();
        SyncRequestorConnectionStuff serviceQueueStuff =
                new SyncRequestorConnectionStuff(RemoteService.url, RemoteService.serviceQueueName);
        requestor.runBlocking(serviceQueueStuff);
        serviceQueueStuff.getConnection().close();
    }
}

Continuing to use the plain-vanilla RemoteServer (introduced in Part 1) when running this client results in the following output:

Mar 23, 2010 10:26:11 AM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
10:26:11.514 Sending request for service 'null'...

And meanwhile, the service has issued this complaint, but the client will not be aware of it:

SEVERE: ID:localhost-52606-1269361388958-0:0:1:1 Exception while processing message: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received
java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received
   .....


One technique to remedy the client's ignorance of the exception is to listen on a separate queue where messages about exceptions, invalid messages, and etc. can be posted. A service that posts such messages might look like this:

package com.mybiz.jms.activemq.server.requestreply.replier;

import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ProducerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

// see Part 1 to examine the RemoteService class
public class InvalidQueueRemoteService extends RemoteService {

    public static final String invalidQueueName = "InvalidQueue";
    private ConnectionStuff invalidStuff;

    public InvalidQueueRemoteService() {}

    public InvalidQueueRemoteService(String theUrl, String serviceQueueName, String invalidQueueName)
            throws JMSException {

        super(theUrl, serviceQueueName);
        // use the same connection for the invalid queue as is used for the normal service replies.
        // Connections are expensive, but you can get multiple sessions out of them.
        invalidStuff = new ProducerConnectionStuff(getServiceQueueConnection(), invalidQueueName);
        System.out.println("   ...service will now send replies to an INVALID queue...");
    }

    private Connection getInvalidQueueConnection() {
        return invalidStuff.getConnection();
    }

    public void onMessage(Message requestMsg) {

        try {
            if (requestMsg instanceof TextMessage) {
                super.onMessage(requestMsg);
            } else {
                MessageUtil.examineReceivedRequest(requestMsg);
                // send a message back to an "invalid" queue. Any blocking requests
                // would still block forever. At least in this case, that requestor gets some indication
                // that something went wrong.
                sendReply(invalidStuff, requestMsg, "Replied to Invalid Queue",
                        invalidStuff.getDestination());
            }
        } catch (Exception e) {
            try {
                sendReply(invalidStuff, requestMsg, "Exception occurred: " + e,
                        invalidStuff.getDestination());
            } catch (JMSException e1) {
                throw new IllegalStateException("Multiple exceptions occurred; first " + e
                        + " -- and then " + e1);
            }
        }
    }

    protected void shutdown() throws Exception {
        super.shutdown();
        getInvalidQueueConnection().close();
    }

    public static void main(String[] args) throws Exception {
        RemoteService service =
                new InvalidQueueRemoteService(url, serviceQueueName, invalidQueueName);
        RemoteService.run(service);
    }
}


A base class that listens (asychronously) on this queue could look like this:

package com.mybiz.jms.activemq.server.requestreply.requestor.sync;

import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import com.mybiz.jms.activemq.server.requestreply.connection.AsyncConsumerConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.TextMessage;

public abstract class AbstractInvalidListener implements MessageListener {

    public void onMessage(Message msg) {

        try {
            System.out.println("Received message on INVALID queue!");
            MessageUtil.examineReceivedReply(msg);
            msg.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    protected ConnectionStuff setUpInvalidQueue(ConnectionStuff stuff, String theQueueName) throws JMSException {

        System.out.println("Setting up 'invalid queue' to listen for exceptions, etc...");
        ConnectionStuff invalidStuff = new AsyncConsumerConnectionStuff(stuff.getConnection(), theQueueName, this);
        // need to return a different "stuff" object here that reuses existing connection but encapsulates
        // new session and destination; client should use that stuff object for subsequent invalid-queue
        // messaging
        return invalidStuff;
    }

    public void testInvalidQueue(ConnectionStuff stuff) throws JMSException {

        TextMessage testMsg =
                stuff.getSession().createTextMessage("Testing invalid queue");
        testMsg.setText("Testing Invalid Queue");
        testMsg.setJMSReplyTo(stuff.getDestination());
        MessageUtil.examineRequestBeforeSend(testMsg);
       
        MessageProducer producer = stuff.getSession().createProducer(stuff.getDestination());
        producer.send(testMsg);
        MessageUtil.examineSentRequest(testMsg);
    }
}


A concrete subclass that uses this could be something like this:
package com.mybiz.jms.activemq.server.requestreply.requestor.sync.broken;

import com.mybiz.jms.activemq.server.requestreply.connection.ConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.connection.SyncRequestorConnectionStuff;
import com.mybiz.jms.activemq.server.requestreply.replier.InvalidQueueRemoteService;
import com.mybiz.jms.activemq.server.requestreply.replier.RemoteService;
import com.mybiz.jms.activemq.server.requestreply.requestor.sync.AbstractInvalidListener;
import com.mybiz.jms.activemq.server.requestreply.util.MessageUtil;
import javax.jms.JMSException;
import javax.jms.Message;

public class InvalidListener extends AbstractInvalidListener {

    public void runBlocking(SyncRequestorConnectionStuff stuff) throws JMSException {

        // create a null text message so as to provoke an exception.
        Message mistake = stuff.getSession().createTextMessage();
        MessageUtil.processRequestAndReply(stuff.getRequestor(), mistake);

        // Though this client now listens on the invalid queue, it is STILL going to block 
        // forever since the request still gets no reply - but at least this client gets 
        // notified since it's listening on the invalid queue. At that point, the client might 
        // be able to e.g. kill a separately-launched thread that is blocking, or otherwise 
        // deal with state as needed.

        // Can we provoke a reply to the blocked service request from that point?
        // That would be ideal, since that guarantees that the service reply comes AFTER the 
        // invalid reply. But that would involve knowing the name of the temporary queue that 
        // was set as the reply-to, but that isn't set until after the message is sent - which 
        // is too late - since it's a blocking request. Once the send is done, the requestor 
        // blocks for a reply. We need a better strategy.
    }

    public static void main(String[] args) throws JMSException {

        InvalidListener requestor = new InvalidListener();
        SyncRequestorConnectionStuff serviceQueueStuff =
                new SyncRequestorConnectionStuff(RemoteService.url, 
                RemoteService.serviceQueueName);
        ConnectionStuff invalidStuff =
                requestor.setUpInvalidQueue(serviceQueueStuff, 
                     InvalidQueueRemoteService.invalidQueueName);
        requestor.testInvalidQueue(invalidStuff);
        requestor.runBlocking(serviceQueueStuff);
        serviceQueueStuff.getConnection().close();
    }
}

Here's the console output when starting this new service:

Mar 23, 2010 10:59:57 AM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Service is waiting for a request...
   ...service will now send replies to an INVALID queue...

...and when running the requestor client:

Mar 23, 2010 11:00:28 AM org.apache.activemq.transport.failover.FailoverTransport doReconnect
INFO: Successfully connected to tcp://localhost:61616
Setting up 'invalid queue' to listen for exceptions, etc...
11:00:29.104 Sending request for service 'Testing Invalid Queue'...
Received message on INVALID queue!
11:00:29.242 Request for service 'Testing Invalid Queue' has been sent...
    Message ID: ID:localhost-52894-1269363628763-0:0:2:1:1
    Correlation ID: null
    Reply to:   queue://InvalidQueue
----------------------------------------------
11:00:29.236 Received reply 'Testing Invalid Queue'
    Message ID: ID:localhost-52894-1269363628763-0:0:2:1:1
    Correlation ID: null
    Reply to:   queue://InvalidQueue
----------------------------------------------
11:00:29.243 Sending request for service 'null'...
Received message on INVALID queue!
11:00:29.275 Received reply 'Exception occurred: java.lang.IllegalStateException: java.lang.IllegalStateException: NULL message received'
    Message ID: ID:localhost-52884-1269363597038-0:0:2:2:1
    Correlation ID: ID:localhost-52894-1269363628763-0:0:1:1:1
    Reply to:   null
----------------------------------------------

So, we've succeeded in at least notifying the client that its service request resulted in an exception; this is marginally helpful in the context of using a block-until-reply requestor. Note that the TopicRequestor is another JMS 1.1 concrete helper class, used again to facilitate a simple request-reply arrangement, but the API is the same - the request is blocking. Bottom line, we'd have better resilience - and be less coupled in either event, which is arguably one of the major benefits of a messaging approach - to use asynchronous messaging.

The next article will continue with use of the separate queue for invalid messaging, using asynchronous messaging for both that and for service requests, and evolve to ensure that invalid messages are delivered only to the appropriate requestor (since the invalid queue is a system-wide resource, shared by all requestor clients).

References

JMS Request/Reply, Part 1
JMS Home Page
JMS Downloads
JMS FAQ
JEE 5 JMS Tutorial
JEE 5 JMS Tutorial - Example Code
JEE 5 Online Javadoc
JMS 1.1 Online Javadoc
Apache ActiveMQ
Enterprise Integration Patterns - Messaging

No comments:

Post a Comment