Thursday, April 1, 2010

Unit Test with a Reusable Spring-JMS Configuration

In my previous post, I introduced a general-purpose, allegedly reusable Spring configuration for an ActiveMQ-based JMS application. In this post, I'll expand on that by writing a unit test around a basic configuration that establishes an evolving prototype.

Writing tests that fail, as it turns out, is pretty simple. Let's pick that low-hanging fruit:

package com.mybiz.jms;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.annotation.Resource;
import javax.jms.DeliveryMode;

@ContextConfiguration(locations =
{
    "classpath:com/mybiz/jms/spring-jms-demo.xml"}
)
public class BasicConfigTest extends AbstractTestNGSpringContextTests
{
    @Resource
    private JmsTemplate myTemplate;

    @BeforeClass
    public void setUp() throws Exception
    {
        System.out.println("Run BasicConfigTest...");
        assert applicationContext != null;
    }

    @Test
    public void testConfig() throws Exception
    {
        System.out.println("testConfig...");
        assert myTemplate.getDeliveryMode() == DeliveryMode.PERSISTENT;
        assert myTemplate.getTimeToLive() == 10000;
        assert myTemplate.isExplicitQosEnabled();
        assert myTemplate.isPubSubDomain();
        assert myTemplate.isSessionTransacted();
        assert myTemplate.isPubSubNoLocal();
    }
}

I don't need to tell you that, without any other work, this test will fail; let's take on something a bit less trivial. The test assumes a Spring configuration which resides in the same logical directory tree as the class. Here's that spring-jms-demo.xml configuration:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     
       xmlns="http://www.springframework.org/schema/beans" 
       xsi:schemalocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
    
    <import resource="spring-context.xml"/>
    <import resource="spring-jms.xml"/>

</beans>

The first import establishes context-based annotationscanning for components and property placeholder
resolution, which will combine to help us reuse the generic Spring-JMS configuration (the second import). This spring-context.xml file is the beginning of my application-specific configuration:

<beans xmlns:context="http://www.springframework.org/schema/context"             
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xmlns="http://www.springframework.org/schema/beans"  
       xsi:schemalocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context-3.0.xsd">
       
        <context:annotation-config>
            <context:component-scan base-package="com.mybiz.jms"/>
            <context:property-placeholder 
                location="com/mybiz/jms/spring-jms.properties" 
                system-properties-mode="OVERRIDE"/>
        </context:annotation-config>
</beans>

That file uses a properties file (spring-jms.properties) that backfills the generic Spring-JMS configuration with more of our application specifics:

# URL for connection to broker - use testing URL:
brokerURL=vm://localhost?async=false&broker.persistent=false
#
# messages will remain in broker for 10 seconds
timeToLive=10000
#
# 1 - non-persistent / 2 - persistent
deliveryMode=2
#
# Specify concurrency of 40 for starters
sessionCacheSize=40
#
# big perf boost, minimal risk (see comments in spring-jms.xml)
useAsyncSend=true
#
# support for flexibility on per-send basis
explicitQosEnabled=true
#
# concern is around a blocking consumer (see comments in spring-jms.xml)
dispatchAsync=true
#
# txn around sends
sessionTransacted=true
#
# just in case
pubSubNoLocal=true
#
# must specify this if it's a topic; default is false
pubSubDomain=true
#
# txn around receives
acknowledge=transacted
#
# destination for topic
destination=theTopic

Finally, I need two listeners that are referenced by the Spring-JMS configuration:

package com.mybiz.jms;

import org.springframework.stereotype.Service;

@Service
public class MyListener {}  // works for now; but this will need some work


package com.mybiz.jms;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;

@Component
public class MyExceptionListener implements ExceptionListener
{
    private final Logger log = LoggerFactory.getLogger(getClass());
    public void onException(JMSException e)
    {
        log.error("JMS error", e);
    }
}

Now the unit test will run successfully; there's no (meaningful) output to show you, and for that matter the test and listeners are along the lines of "what's the simplest thing that would work" - it's really only done to demonstrate that all the moving parts are in place and that I've used them correctly so far. In my next, I'll start adding on some target functionality and look for ways to test it.

A General Template for Spring-JMS Applications

Here I'll post a simple Spring setup that can be reused for specific JMS-based applications. It assumes the use of ActiveMQ 5.3.0, opting to use Spring's caching connection factory in lieu of ActiveMQ's pooling version. It uses various Spring-JMS components, including one JmsTemplate and one MessageListenerContainer, respectively intended for sending and receiving messages to-and-from a topic. In upcoming posts, I'll use this to prototype a JMS application.

The connection factory and JMS template have most of their specific configuration parameters externalized to facilitate better reuse.

For the ActiveMQ connection factory, the externalized properties include the broker URL, use of asynchronous sends and choice around dispatching asynchronously. The example XML template that follows includes some commentary around the pros/cons of configuring the last two. This factory is used by Spring's caching connection factory, externalizes configuration of session cache size, and assumes the presence of an exception listener.

The JMS template has the following properties that are configurable: delivery mode, time-to-live, whether or not explicit quality of service is enabled, whether or not the session is transacted, whether or not a given connection should listen for messages sent by that same connection, and whether or not the domain is pub-sub or point-to-point.

I would have externalized configuration for the listener container but, as it turns out, apparently the listener-container parent tag does not support this, so I've hardwired this to use the default listener container, listening on a topic within a transaction.

Here's that template:

<?xml version="1.0" encoding="UTF-8"?>
<!--
A reusable template for Spring-JMS applications. The following must be configured externally, presumably
via Spring's property-placeholder mechanism:

brokerURL
useAsyncSend
dispatchAsync
sessionCacheSize
deliveryMode
explicitQosEnabled
sessionTransacted
pubSubNoLocal
pubSubDomain
destination

As well, the following is assumed:

An exception listener that can be dereference by the name 'myExceptionListener' is assumed present 
in classpath.
A message listener that can be dereference by the name 'myListener' is assumed present in classpath.
-->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-3.0.xsd
           http://activemq.apache.org/schema/core
           http://activemq.apache.org/schema/core/activemq-core-5.3.0.xsd
           http://www.springframework.org/schema/jms
           http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!--
    Configure connection factory
    -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!--
        for testing, we want to use vm://localhost?async=false&broker.persistent=false.
        This does intra-JVM messaging using synchronous sends and doesn't bother with persisting
        messages to cover provider failures. See http://activemq.apache.org/uri-protocols.html for
        protocol options.
         -->
        <property name="brokerURL" value="${testUrl}"/>
        <!--
         See http://activemq.apache.org/async-sends.html - use async sends for better throughput but
         only if you're able to tolerate some message loss. 

         From the FUSE Tuning Guide (http://fusesource.com/wiki/display/ProdInfo/FUSE%20Message%20Broker%20Performance%20Tuning%20Guide):

         If you are using persistent messaging and you don't want to use Async Sends (see below) then you should
         use JMS transactions to batch up many operations inside a single transaction. If you enable Async Sends
         then you will reduce the blocking in senders which increases throughput.

         The only downside of asynchronous sending is if the send fails for whatever reason (security exception
         typically or some transport failure), then you don't get an exception thrown in the sender thread,
         since all the work is done asynchronously, though your ErrorListener will get notified.

         From the activemq.xsd:

         Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will
         return immediately whether the message has been sent or not which could lead to message loss.
         -->
        <property name="useAsyncSend" value="${useAsyncSend}"/>
        <!--
        From the activemq.xsd:

        Enables or disables the default setting of whether or not consumers have their messages dispatched
        synchronously or asynchronously by the broker. For non-durable topics for example we typically dispatch
        synchronously by default to minimize context switches which boost performance. However sometimes its better
        to go slower to ensure that a single blocked consumer socket does not block delivery to other consumers.
        See http://activemq.apache.org/consumer-dispatch-async.html.
        -->
        <property name="dispatchAsync" value="${dispatchAsync}"/>
    </bean>

    <!--
    Use the vanilla connection factory to configure a caching connection factory, using the Spring
    version instead of ActiveMQ. Default session cache size for Spring factory is 1. An exception listener
    that can be dereference by the name 'myExceptionListener' is assumed present in classpath.
    -->
    <bean id="cachingConnectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory"
          destroy-method="destroy">

        <property name="targetConnectionFactory" ref="connectionFactory"/>
        <property name="sessionCacheSize" value="${sessionCacheSize}"/>
        <property name="exceptionListener" ref="myExceptionListener"/>

    </bean>

    <!--
    The JMS template intended for sending of messages.
    -->
    <bean id="myTemplate" class="org.springframework.jms.core.JmsTemplate">

        <constructor-arg ref="cachingConnectionFactory"/>
        <property name="deliveryMode" value="${deliveryMode}"/>
        <property name="timeToLive" value="${timeToLive}"/>
        <!--
        if explicit QoS is enabled, you can control deliveryMode, priority and TTL on a per-send basis
        -->
        <property name="explicitQosEnabled" value="${explicitQosEnabled}"/>
        <property name="sessionTransacted" value="${sessionTransacted}"/>
        <property name="pubSubNoLocal" value="${pubSubNoLocal}"/>
        <!--
        override JMS template default domain type of PTP if you want pub-sub
        -->
        <property name="pubSubDomain" value="${pubSubDomain}"/>
    </bean>

    <!--
    concurrency should be 1 for topic listeners. This is one attribute that apparently canNOT be
    externalized into the properties file. Use transacted acknowledgment (another attribute that
    cannot be externalized). A message listener that can be dereference by the name 'myListener' 
    is assumed present in classpath.
    -->
    <jms:listener-container container-type="default"
                            destination-type="topic"
                            acknowledge="transacted"
                            connection-factory="cachingConnectionFactory"
                            concurrency="1">
        <jms:listener
                destination="${myTopic}"
                ref="myListener"/>
    </jms:listener-container>

</beans>

That's a starting point; from here, I'll add configuration, listeners and a test case as the next step.

References

Apache ActiveMQ
Spring 3.0.0 Javadoc
ActiveMQ Javadoc
FUSE Tuning Guide
Spring 3.0.0 Reference Documentation
Efficient Lightweight JMS with Spring and ActiveMQ
Creating Robust JMS Applications (Java EE 5 Tutorial)