Thursday, January 27, 2011

Periodic Task and Watchdog - Reusable and Testable Versions

In my previous post, I presented a simple utility for a periodically executing task and a watchdog to make sure it runs forever. Now I'll refactor that prototype code to make things generic and reusable.

We'll end up with an abstract PeriodicTask class, a watchdog, a concrete implementation of the task abstraction and a test. Let's say the concrete task is a poller of some sort, and ignore any details beyond that. We'll also use Spring to instantiate the task and watchdog, start both threads, and use that context for a JUnit test that confirms the task will restart shortly after it stops - say, within 5 seconds - or the test fails. It can stop for various reasons, e.g.  encountering an exception, getting cancelled outright, etc.

Here's the PeriodicTask abstraction - it prescribes an instance getter of type T, which the concrete class will specify. That instance getter is needed to construct a new poller if-when the current poller stops for whatever reason (outright cancellation is how we'll do it here). This class has been refactored to be reusable:


public abstract class PeriodicTask<T> {

private ScheduledExecutorService executor;
private ScheduledFuture future;

public abstract long getDelay(); // in milliseconds

public abstract long getPeriod(); // in milliseconds

public abstract Runnable getTask();

public abstract T getInstance();

protected void start() {
executor = Executors.newSingleThreadScheduledExecutor();
future = executor.scheduleAtFixedRate(getTask(), getDelay(), getPeriod(), TimeUnit.MILLISECONDS);
}

public boolean cancel() {
return future.cancel(true);
}

public boolean isDone() {
return future.isDone();
}

public void shutdown() {
executor.shutdown();
}
}

This is the poller, a concrete task - it relies on external configuration to set the startup delay and repeat period:


public class Poller extends PeriodicTask {

private long delay, period;
public Poller(long theDelay, long thePeriod) {
delay = theDelay;
period = thePeriod;
}

@Override
public long getDelay() {
return delay;
}

@Override
public long getPeriod() {
return period;
}

int total;
@Override
public Runnable getTask() {
return new Runnable() {
public void run() {
// do something here
}
};
}

@Override
public Poller getInstance() {
return new Poller(getDelay(), getPeriod());
}
}

The watchdog needs the concrete type information, so it uses generics to facilitate that:


public class Watchdog<T extends PeriodicTask> {

private T task;
private final long delay, period; // in milliseconds

public Watchdog(T theTask, long theDelay, long thePeriod) {
delay = theDelay;
period = thePeriod;
task = theTask;
}

public void start() {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new Watcher(), delay, period, TimeUnit.MILLISECONDS);
}

/**
* Provide the task for external clients since any original reference to the initial
* task becomes obsolete if-when the task gets restarted.
*/
public T getTask() {
return task;
}

private class Watcher implements Runnable {

public void run() {

while (true) {
if (task.isDone()) {
System.out.println("Task stopped - restarting...");
// cancel the task and shutdown the executor
task.cancel();
task.shutdown();
// get new instance, restart it
task = (T)task.getInstance();
task.start();
}
}
}
}
}

The spring config file, named e.g. poller-test-context.xml, wires things up and calls the start method to fire off both threads:


<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="poller" class="com.mybiz.Poller" init-method="start">
<constructor-arg value="0"/>
<constructor-arg value="300"/>
</bean>

<bean id="watchdog" class="com.mybiz.Watchdog" init-method="start">
<constructor-arg ref="poller"/>
<constructor-arg value="0"/>
<constructor-arg value="500"/>
</bean>
</beans>

The test is auto-wired with the watchdog and uses it to cancel the poller task:


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "poller-test-context.xml")
public class PollerTest implements InitializingBean {

@Autowired
private Watchdog watchdog;

@Override
public void afterPropertiesSet() throws Exception {
assert watchdog != null;
}

@Test
public void confirmRestart() throws Exception {

int num = 0; // cancel the task every second or so
while (num < 20) { // keep this thing going for 5 seconds or so
Thread.sleep(250);
if (++num % 4 == 0) {

// confirm it's running, then cancel it to stimulate a watchdog restart
assert !watchdog.getTask().isDone();
watchdog.getTask().cancel();

// task must get retarted within 5 seconds
long now = System.currentTimeMillis();
long end = now + 5000;
while (now < end) {
if (!watchdog.getTask().isDone()) {
Thread.sleep(100);
now = System.currentTimeMillis();
} else {
break;
}
}
if (!watchdog.getTask().isDone()) {
assert false : "task did not restart";
}
}
}
}
}

Finally, the output:


88 [main] INFO org.springframework.test.context.TestContextManager ....
352 [main] INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader ....
661 [main] INFO org.springframework.context.support.GenericApplicationContext ....
813 [main] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory ....
Task stopped - restarting...
Task stopped - restarting...
Task stopped - restarting...
Task stopped - restarting...
Task stopped - restarting...
5989 [Thread-1] INFO org.springframework.context.support.GenericApplicationContext ....
5990 [Thread-1] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory ....
Process finished with exit code 0

No comments:

Post a Comment