Reformat Async Method Invocation - Issue #224

This commit is contained in:
Ankur Kaushal 2015-11-01 17:33:25 -05:00
parent 95c16200e7
commit 16a8c85af6
6 changed files with 229 additions and 228 deletions

View File

@ -4,23 +4,24 @@ import java.util.concurrent.Callable;
/** /**
* This application demonstrates the async method invocation pattern. Key parts of the pattern are * This application demonstrates the async method invocation pattern. Key parts of the pattern are
* <code>AsyncResult</code> which is an intermediate container for an asynchronously evaluated value, * <code>AsyncResult</code> which is an intermediate container for an asynchronously evaluated
* <code>AsyncCallback</code> which can be provided to be executed on task completion and * value, <code>AsyncCallback</code> which can be provided to be executed on task completion and
* <code>AsyncExecutor</code> that manages the execution of the async tasks. * <code>AsyncExecutor</code> that manages the execution of the async tasks.
* <p> * <p>
* The main method shows example flow of async invocations. The main thread starts multiple tasks with * The main method shows example flow of async invocations. The main thread starts multiple tasks
* variable durations and then continues its own work. When the main thread has done it's job it collects * with variable durations and then continues its own work. When the main thread has done it's job
* the results of the async tasks. Two of the tasks are handled with callbacks, meaning the callbacks are * it collects the results of the async tasks. Two of the tasks are handled with callbacks, meaning
* executed immediately when the tasks complete. * the callbacks are executed immediately when the tasks complete.
* <p> * <p>
* Noteworthy difference of thread usage between the async results and callbacks is that the async results * Noteworthy difference of thread usage between the async results and callbacks is that the async
* are collected in the main thread but the callbacks are executed within the worker threads. This should be * results are collected in the main thread but the callbacks are executed within the worker
* noted when working with thread pools. * threads. This should be noted when working with thread pools.
* <p> * <p>
* Java provides its own implementations of async method invocation pattern. FutureTask, CompletableFuture * Java provides its own implementations of async method invocation pattern. FutureTask,
* and ExecutorService are the real world implementations of this pattern. But due to the nature of parallel * CompletableFuture and ExecutorService are the real world implementations of this pattern. But due
* programming, the implementations are not trivial. This example does not take all possible scenarios into * to the nature of parallel programming, the implementations are not trivial. This example does not
* account but rather provides a simple version that helps to understand the pattern. * take all possible scenarios into account but rather provides a simple version that helps to
* understand the pattern.
* *
* @see AsyncResult * @see AsyncResult
* @see AsyncCallback * @see AsyncCallback
@ -32,66 +33,68 @@ import java.util.concurrent.Callable;
*/ */
public class App { public class App {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// construct a new executor that will run async tasks // construct a new executor that will run async tasks
AsyncExecutor executor = new ThreadAsyncExecutor(); AsyncExecutor executor = new ThreadAsyncExecutor();
// start few async tasks with varying processing times, two last with callback handlers // start few async tasks with varying processing times, two last with callback handlers
AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500)); AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300)); AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700)); AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4")); AsyncResult<Integer> asyncResult4 =
AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5")); executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
AsyncResult<String> asyncResult5 =
executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));
// emulate processing in the current thread while async tasks are running in their own threads // emulate processing in the current thread while async tasks are running in their own threads
Thread.sleep(350); // Oh boy I'm working hard here Thread.sleep(350); // Oh boy I'm working hard here
log("Some hard work done"); log("Some hard work done");
// wait for completion of the tasks // wait for completion of the tasks
Integer result1 = executor.endProcess(asyncResult1); Integer result1 = executor.endProcess(asyncResult1);
String result2 = executor.endProcess(asyncResult2); String result2 = executor.endProcess(asyncResult2);
Long result3 = executor.endProcess(asyncResult3); Long result3 = executor.endProcess(asyncResult3);
asyncResult4.await(); asyncResult4.await();
asyncResult5.await(); asyncResult5.await();
// log the results of the tasks, callbacks log immediately when complete // log the results of the tasks, callbacks log immediately when complete
log("Result 1: " + result1); log("Result 1: " + result1);
log("Result 2: " + result2); log("Result 2: " + result2);
log("Result 3: " + result3); log("Result 3: " + result3);
} }
/** /**
* Creates a callable that lazily evaluates to given value with artificial delay. * Creates a callable that lazily evaluates to given value with artificial delay.
* *
* @param value value to evaluate * @param value value to evaluate
* @param delayMillis artificial delay in milliseconds * @param delayMillis artificial delay in milliseconds
* @return new callable for lazy evaluation * @return new callable for lazy evaluation
*/ */
private static <T> Callable<T> lazyval(T value, long delayMillis) { private static <T> Callable<T> lazyval(T value, long delayMillis) {
return () -> { return () -> {
Thread.sleep(delayMillis); Thread.sleep(delayMillis);
log("Task completed with: " + value); log("Task completed with: " + value);
return value; return value;
}; };
} }
/** /**
* Creates a simple callback that logs the complete status of the async result. * Creates a simple callback that logs the complete status of the async result.
* *
* @param name callback name * @param name callback name
* @return new async callback * @return new async callback
*/ */
private static <T> AsyncCallback<T> callback(String name) { private static <T> AsyncCallback<T> callback(String name) {
return (value, ex) -> { return (value, ex) -> {
if (ex.isPresent()) { if (ex.isPresent()) {
log(name + " failed: " + ex.map(Exception::getMessage).orElse("")); log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
} else { } else {
log(name + ": " + value); log(name + ": " + value);
} }
}; };
} }
private static void log(String msg) { private static void log(String msg) {
System.out.println(String.format("[%1$-10s] - %2$s", Thread.currentThread().getName(), msg)); System.out.println(String.format("[%1$-10s] - %2$s", Thread.currentThread().getName(), msg));
} }
} }

View File

@ -11,12 +11,11 @@ import java.util.Optional;
*/ */
public interface AsyncCallback<T> { public interface AsyncCallback<T> {
/** /**
* Complete handler which is executed when async task is completed or fails execution. * Complete handler which is executed when async task is completed or fails execution.
* *
* @param value the evaluated value from async task, undefined when execution fails * @param value the evaluated value from async task, undefined when execution fails
* @param ex empty value if execution succeeds, some exception if executions fails * @param ex empty value if execution succeeds, some exception if executions fails
*/ */
void onComplete(T value, Optional<Exception> ex); void onComplete(T value, Optional<Exception> ex);
} }

View File

@ -10,33 +10,32 @@ import java.util.concurrent.ExecutionException;
*/ */
public interface AsyncExecutor { public interface AsyncExecutor {
/** /**
* Starts processing of an async task. Returns immediately with async result. * Starts processing of an async task. Returns immediately with async result.
* *
* @param task task to be executed asynchronously * @param task task to be executed asynchronously
* @return async result for the task * @return async result for the task
*/ */
<T> AsyncResult<T> startProcess(Callable<T> task); <T> AsyncResult<T> startProcess(Callable<T> task);
/** /**
* Starts processing of an async task. Returns immediately with async result. Executes callback * Starts processing of an async task. Returns immediately with async result. Executes callback
* when the task is completed. * when the task is completed.
* *
* @param task task to be executed asynchronously * @param task task to be executed asynchronously
* @param callback callback to be executed on task completion * @param callback callback to be executed on task completion
* @return async result for the task * @return async result for the task
*/ */
<T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback); <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);
/**
* Ends processing of an async task. Blocks the current thread if necessary and returns the
* evaluated value of the completed task.
*
* @param asyncResult async result of a task
* @return evaluated value of the completed task
* @throws ExecutionException if execution has failed, containing the root cause
* @throws InterruptedException if the execution is interrupted
*/
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
/**
* Ends processing of an async task. Blocks the current thread if necessary and returns the
* evaluated value of the completed task.
*
* @param asyncResult async result of a task
* @return evaluated value of the completed task
* @throws ExecutionException if execution has failed, containing the root cause
* @throws InterruptedException if the execution is interrupted
*/
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
} }

View File

@ -10,26 +10,26 @@ import java.util.concurrent.ExecutionException;
*/ */
public interface AsyncResult<T> { public interface AsyncResult<T> {
/** /**
* Status of the async task execution. * Status of the async task execution.
* *
* @return <code>true</code> if execution is completed or failed * @return <code>true</code> if execution is completed or failed
*/ */
boolean isCompleted(); boolean isCompleted();
/** /**
* Gets the value of completed async task. * Gets the value of completed async task.
* *
* @return evaluated value or throws ExecutionException if execution has failed * @return evaluated value or throws ExecutionException if execution has failed
* @throws ExecutionException if execution has failed, containing the root cause * @throws ExecutionException if execution has failed, containing the root cause
* @throws IllegalStateException if execution is not completed * @throws IllegalStateException if execution is not completed
*/ */
T getValue() throws ExecutionException; T getValue() throws ExecutionException;
/** /**
* Blocks the current thread until the async task is completed. * Blocks the current thread until the async task is completed.
* *
* @throws InterruptedException if the execution is interrupted * @throws InterruptedException if the execution is interrupted
*/ */
void await() throws InterruptedException; void await() throws InterruptedException;
} }

View File

@ -12,116 +12,117 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class ThreadAsyncExecutor implements AsyncExecutor { public class ThreadAsyncExecutor implements AsyncExecutor {
/** Index for thread naming */ /** Index for thread naming */
private final AtomicInteger idx = new AtomicInteger(0); private final AtomicInteger idx = new AtomicInteger(0);
@Override @Override
public <T> AsyncResult<T> startProcess(Callable<T> task) { public <T> AsyncResult<T> startProcess(Callable<T> task) {
return startProcess(task, null); return startProcess(task, null);
} }
@Override @Override
public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) { public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
CompletableResult<T> result = new CompletableResult<>(callback); CompletableResult<T> result = new CompletableResult<>(callback);
new Thread(() -> { new Thread(() -> {
try { try {
result.setValue(task.call()); result.setValue(task.call());
} catch (Exception ex) { } catch (Exception ex) {
result.setException(ex); result.setException(ex);
} }
}, "executor-" + idx.incrementAndGet()).start(); }, "executor-" + idx.incrementAndGet()).start();
return result; return result;
} }
@Override @Override
public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException { public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException,
if (asyncResult.isCompleted()) { InterruptedException {
return asyncResult.getValue(); if (asyncResult.isCompleted()) {
} else { return asyncResult.getValue();
asyncResult.await(); } else {
return asyncResult.getValue(); asyncResult.await();
} return asyncResult.getValue();
} }
}
/** /**
* Simple implementation of async result that allows completing it successfully with a value * Simple implementation of async result that allows completing it successfully with a value or
* or exceptionally with an exception. A really simplified version from its real life cousins * exceptionally with an exception. A really simplified version from its real life cousins
* FutureTask and CompletableFuture. * FutureTask and CompletableFuture.
* *
* @see java.util.concurrent.FutureTask * @see java.util.concurrent.FutureTask
* @see java.util.concurrent.CompletableFuture * @see java.util.concurrent.CompletableFuture
*/ */
private static class CompletableResult<T> implements AsyncResult<T> { private static class CompletableResult<T> implements AsyncResult<T> {
static final int RUNNING = 1; static final int RUNNING = 1;
static final int FAILED = 2; static final int FAILED = 2;
static final int COMPLETED = 3; static final int COMPLETED = 3;
final Object lock; final Object lock;
final Optional<AsyncCallback<T>> callback; final Optional<AsyncCallback<T>> callback;
volatile int state = RUNNING; volatile int state = RUNNING;
T value; T value;
Exception exception; Exception exception;
CompletableResult(AsyncCallback<T> callback) { CompletableResult(AsyncCallback<T> callback) {
this.lock = new Object(); this.lock = new Object();
this.callback = Optional.ofNullable(callback); this.callback = Optional.ofNullable(callback);
} }
/** /**
* Sets the value from successful execution and executes callback if available. Notifies * Sets the value from successful execution and executes callback if available. Notifies any
* any thread waiting for completion. * thread waiting for completion.
* *
* @param value value of the evaluated task * @param value value of the evaluated task
*/ */
void setValue(T value) { void setValue(T value) {
this.value = value; this.value = value;
this.state = COMPLETED; this.state = COMPLETED;
this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty())); this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
synchronized (lock) { synchronized (lock) {
lock.notifyAll(); lock.notifyAll();
} }
} }
/** /**
* Sets the exception from failed execution and executes callback if available. Notifies * Sets the exception from failed execution and executes callback if available. Notifies any
* any thread waiting for completion. * thread waiting for completion.
* *
* @param exception exception of the failed task * @param exception exception of the failed task
*/ */
void setException(Exception exception) { void setException(Exception exception) {
this.exception = exception; this.exception = exception;
this.state = FAILED; this.state = FAILED;
this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception))); this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
synchronized (lock) { synchronized (lock) {
lock.notifyAll(); lock.notifyAll();
} }
} }
@Override @Override
public boolean isCompleted() { public boolean isCompleted() {
return (state > RUNNING); return (state > RUNNING);
} }
@Override @Override
public T getValue() throws ExecutionException { public T getValue() throws ExecutionException {
if (state == COMPLETED) { if (state == COMPLETED) {
return value; return value;
} else if (state == FAILED) { } else if (state == FAILED) {
throw new ExecutionException(exception); throw new ExecutionException(exception);
} else { } else {
throw new IllegalStateException("Execution not completed yet"); throw new IllegalStateException("Execution not completed yet");
} }
} }
@Override @Override
public void await() throws InterruptedException { public void await() throws InterruptedException {
synchronized (lock) { synchronized (lock) {
if (!isCompleted()) { if (!isCompleted()) {
lock.wait(); lock.wait();
} }
} }
} }
} }
} }

View File

@ -9,10 +9,9 @@ import org.junit.Test;
*/ */
public class AppTest { public class AppTest {
@Test @Test
public void test() throws Exception { public void test() throws Exception {
String[] args = {}; String[] args = {};
App.main(args); App.main(args);
} }
} }