In my previous post, I presented a simple utility for a periodically executing task and a watchdog to make sure it runs forever. Now I'll refactor that prototype code to make things generic and reusable.
We'll end up with an abstract PeriodicTask class, a watchdog, a concrete implementation of the task abstraction and a test. Let's say the concrete task is a poller of some sort, and ignore any details beyond that. We'll also use Spring to instantiate the task and watchdog, start both threads, and use that context for a JUnit test that confirms the task will restart shortly after it stops - say, within 5 seconds - or the test fails. It can stop for various reasons, e.g. encountering an exception, getting cancelled outright, etc.
Here's the PeriodicTask abstraction - it prescribes an instance getter of type T, which the concrete class will specify. That instance getter is needed to construct a new poller if-when the current poller stops for whatever reason (outright cancellation is how we'll do it here). This class has been refactored to be reusable:
public abstract class PeriodicTask<T> {
private ScheduledExecutorService executor;
private ScheduledFuture future;
public abstract long getDelay(); // in milliseconds
public abstract long getPeriod(); // in milliseconds
public abstract Runnable getTask();
public abstract T getInstance();
protected void start() {
executor = Executors.newSingleThreadScheduledExecutor();
future = executor.scheduleAtFixedRate(getTask(), getDelay(), getPeriod(), TimeUnit.MILLISECONDS);
}
public boolean cancel() {
return future.cancel(true);
}
public boolean isDone() {
return future.isDone();
}
public void shutdown() {
executor.shutdown();
}
}
This is the poller, a concrete task - it relies on external configuration to set the startup delay and repeat period:
public class Poller extends PeriodicTask {
private long delay, period;
public Poller(long theDelay, long thePeriod) {
delay = theDelay;
period = thePeriod;
}
@Override
public long getDelay() {
return delay;
}
@Override
public long getPeriod() {
return period;
}
int total;
@Override
public Runnable getTask() {
return new Runnable() {
public void run() {
// do something here
}
};
}
@Override
public Poller getInstance() {
return new Poller(getDelay(), getPeriod());
}
}
The watchdog needs the concrete type information, so it uses generics to facilitate that:
public class Watchdog<T extends PeriodicTask> {
private T task;
private final long delay, period; // in milliseconds
public Watchdog(T theTask, long theDelay, long thePeriod) {
delay = theDelay;
period = thePeriod;
task = theTask;
}
public void start() {
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(new Watcher(), delay, period, TimeUnit.MILLISECONDS);
}
/**
* Provide the task for external clients since any original reference to the initial
* task becomes obsolete if-when the task gets restarted.
*/
public T getTask() {
return task;
}
private class Watcher implements Runnable {
public void run() {
while (true) {
if (task.isDone()) {
System.out.println("Task stopped - restarting...");
// cancel the task and shutdown the executor
task.cancel();
task.shutdown();
// get new instance, restart it
task = (T)task.getInstance();
task.start();
}
}
}
}
}
The spring config file, named e.g. poller-test-context.xml, wires things up and calls the start method to fire off both threads:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="poller" class="com.mybiz.Poller" init-method="start">
<constructor-arg value="0"/>
<constructor-arg value="300"/>
</bean>
<bean id="watchdog" class="com.mybiz.Watchdog" init-method="start">
<constructor-arg ref="poller"/>
<constructor-arg value="0"/>
<constructor-arg value="500"/>
</bean>
</beans>
The test is auto-wired with the watchdog and uses it to cancel the poller task:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "poller-test-context.xml")
public class PollerTest implements InitializingBean {
@Autowired
private Watchdog watchdog;
@Override
public void afterPropertiesSet() throws Exception {
assert watchdog != null;
}
@Test
public void confirmRestart() throws Exception {
int num = 0; // cancel the task every second or so
while (num < 20) { // keep this thing going for 5 seconds or so
Thread.sleep(250);
if (++num % 4 == 0) {
// confirm it's running, then cancel it to stimulate a watchdog restart
assert !watchdog.getTask().isDone();
watchdog.getTask().cancel();
// task must get retarted within 5 seconds
long now = System.currentTimeMillis();
long end = now + 5000;
while (now < end) {
if (!watchdog.getTask().isDone()) {
Thread.sleep(100);
now = System.currentTimeMillis();
} else {
break;
}
}
if (!watchdog.getTask().isDone()) {
assert false : "task did not restart";
}
}
}
}
}
Finally, the output:
88 [main] INFO org.springframework.test.context.TestContextManager ....
352 [main] INFO org.springframework.beans.factory.xml.XmlBeanDefinitionReader ....
661 [main] INFO org.springframework.context.support.GenericApplicationContext ....
813 [main] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory ....
Task stopped - restarting...
Task stopped - restarting...
Task stopped - restarting...
Task stopped - restarting...
Task stopped - restarting...
5989 [Thread-1] INFO org.springframework.context.support.GenericApplicationContext ....
5990 [Thread-1] INFO org.springframework.beans.factory.support.DefaultListableBeanFactory ....
Process finished with exit code 0
No comments:
Post a Comment