Merge branch 'trautonen-async-method-invocation'
This commit is contained in:
commit
a8b7dd5a53
13
README.md
13
README.md
@ -71,6 +71,7 @@ Concurrency patterns are those types of design patterns that deal with the multi
|
||||
|
||||
* [Double Checked Locking](#double-checked-locking)
|
||||
* [Thread Pool](#thread-pool)
|
||||
* [Async Method Invocation](#async-method-invocation)
|
||||
|
||||
### Presentation Tier Patterns
|
||||
|
||||
@ -623,6 +624,18 @@ validation and for building to order
|
||||
**Applicability:** Use the Thread Pool pattern when
|
||||
* You have a large number of short-lived tasks to be executed in parallel
|
||||
|
||||
## <a name="async-method-invocation">Async Method Invocation</a> [↑](#list-of-design-patterns)
|
||||
**Intent:** Asynchronous method invocation is pattern where the calling thread is not blocked while waiting results of tasks. The pattern provides parallel processing of multiple independent tasks and retrieving the results via callbacks or waiting until everything is done.
|
||||
|
||||
**Applicability:** Use async method invocation pattern when
|
||||
* You have multiple independent tasks that can run in parallel
|
||||
* You need to improve performance of running a group of sequential tasks
|
||||
* You have limited number of processing capacity or long running tasks and the caller cannot wait the tasks to be ready
|
||||
|
||||
**Real world examples:**
|
||||
* [FutureTask](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/FutureTask.html), [CompletableFuture](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) and [ExecutorService](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html) (Java)
|
||||
* [Task-based Asynchronous Pattern](https://msdn.microsoft.com/en-us/library/hh873175.aspx) (.NET)
|
||||
|
||||
## <a name="private-class-data">Private Class Data</a> [↑](#list-of-design-patterns)
|
||||
**Intent:** Private Class Data design pattern seeks to reduce exposure of attributes by limiting their visibility. It reduces the number of class attributes by encapsulating them in single Data object.
|
||||
|
||||
|
18
async-method-invocation/pom.xml
Normal file
18
async-method-invocation/pom.xml
Normal file
@ -0,0 +1,18 @@
|
||||
<?xml version="1.0"?>
|
||||
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>com.iluwatar</groupId>
|
||||
<artifactId>java-design-patterns</artifactId>
|
||||
<version>1.4.0</version>
|
||||
</parent>
|
||||
<artifactId>async-method-invocation</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@ -0,0 +1,102 @@
|
||||
package com.iluwatar.async.method.invocation;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* This application demonstrates the async method invocation pattern. Key parts of the pattern are
|
||||
* <code>AsyncResult</code> which is an intermediate container for an asynchronously evaluated value,
|
||||
* <code>AsyncCallback</code> which can be provided to be executed on task completion and
|
||||
* <code>AsyncExecutor</code> that manages the execution of the async tasks.
|
||||
* </p>
|
||||
* <p>
|
||||
* The main method shows example flow of async invocations. The main thread starts multiple tasks with
|
||||
* variable durations and then continues its own work. When the main thread has done it's job it collects
|
||||
* the results of the async tasks. Two of the tasks are handled with callbacks, meaning the callbacks are
|
||||
* executed immediately when the tasks complete.
|
||||
* </p>
|
||||
* <p>
|
||||
* Noteworthy difference of thread usage between the async results and callbacks is that the async results
|
||||
* are collected in the main thread but the callbacks are executed within the worker threads. This should be
|
||||
* noted when working with thread pools.
|
||||
* </p>
|
||||
* <p>
|
||||
* Java provides its own implementations of async method invocation pattern. FutureTask, CompletableFuture
|
||||
* and ExecutorService are the real world implementations of this pattern. But due to the nature of parallel
|
||||
* programming, the implementations are not trivial. This example does not take all possible scenarios into
|
||||
* account but rather provides a simple version that helps to understand the pattern.
|
||||
* </p>
|
||||
*
|
||||
* @see AsyncResult
|
||||
* @see AsyncCallback
|
||||
* @see AsyncExecutor
|
||||
*
|
||||
* @see java.util.concurrent.FutureTask
|
||||
* @see java.util.concurrent.CompletableFuture
|
||||
* @see java.util.concurrent.ExecutorService
|
||||
*/
|
||||
public class App {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
// construct a new executor that will run async tasks
|
||||
AsyncExecutor executor = new ThreadAsyncExecutor();
|
||||
|
||||
// start few async tasks with varying processing times, two last with callback handlers
|
||||
AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
|
||||
AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
|
||||
AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
|
||||
AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
|
||||
AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));
|
||||
|
||||
// emulate processing in the current thread while async tasks are running in their own threads
|
||||
Thread.sleep(350); // Oh boy I'm working hard here
|
||||
log("Some hard work done");
|
||||
|
||||
// wait for completion of the tasks
|
||||
Integer result1 = executor.endProcess(asyncResult1);
|
||||
String result2 = executor.endProcess(asyncResult2);
|
||||
Long result3 = executor.endProcess(asyncResult3);
|
||||
asyncResult4.await();
|
||||
asyncResult5.await();
|
||||
|
||||
// log the results of the tasks, callbacks log immediately when complete
|
||||
log("Result 1: " + result1);
|
||||
log("Result 2: " + result2);
|
||||
log("Result 3: " + result3);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a callable that lazily evaluates to given value with artificial delay.
|
||||
*
|
||||
* @param value value to evaluate
|
||||
* @param delayMillis artificial delay in milliseconds
|
||||
* @return new callable for lazy evaluation
|
||||
*/
|
||||
private static <T> Callable<T> lazyval(T value, long delayMillis) {
|
||||
return () -> {
|
||||
Thread.sleep(delayMillis);
|
||||
log("Task completed with: " + value);
|
||||
return value;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a simple callback that logs the complete status of the async result.
|
||||
*
|
||||
* @param name callback name
|
||||
* @return new async callback
|
||||
*/
|
||||
private static <T> AsyncCallback<T> callback(String name) {
|
||||
return (value, ex) -> {
|
||||
if (ex.isPresent()) {
|
||||
log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
|
||||
} else {
|
||||
log(name + ": " + value);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static void log(String msg) {
|
||||
System.out.println(String.format("[%1$-10s] - %2$s", Thread.currentThread().getName(), msg));
|
||||
}
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package com.iluwatar.async.method.invocation;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface AsyncCallback<T> {
|
||||
|
||||
/**
|
||||
* Complete handler which is executed when async task is completed or fails execution.
|
||||
*
|
||||
* @param value the evaluated value from async task, undefined when execution fails
|
||||
* @param ex empty value if execution succeeds, some exception if executions fails
|
||||
*/
|
||||
void onComplete(T value, Optional<Exception> ex);
|
||||
|
||||
}
|
@ -0,0 +1,37 @@
|
||||
package com.iluwatar.async.method.invocation;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public interface AsyncExecutor {
|
||||
|
||||
/**
|
||||
* Starts processing of an async task. Returns immediately with async result.
|
||||
*
|
||||
* @param task task to be executed asynchronously
|
||||
* @return async result for the task
|
||||
*/
|
||||
<T> AsyncResult<T> startProcess(Callable<T> task);
|
||||
|
||||
/**
|
||||
* Starts processing of an async task. Returns immediately with async result. Executes callback
|
||||
* when the task is completed.
|
||||
*
|
||||
* @param task task to be executed asynchronously
|
||||
* @param callback callback to be executed on task completion
|
||||
* @return async result for the task
|
||||
*/
|
||||
<T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);
|
||||
|
||||
/**
|
||||
* Ends processing of an async task. Blocks the current thread if necessary and returns the
|
||||
* evaluated value of the completed task.
|
||||
*
|
||||
* @param asyncResult async result of a task
|
||||
* @return evaluated value of the completed task
|
||||
* @throws ExecutionException if execution has failed, containing the root cause
|
||||
* @throws InterruptedException if the execution is interrupted
|
||||
*/
|
||||
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
|
||||
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
package com.iluwatar.async.method.invocation;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public interface AsyncResult<T> {
|
||||
|
||||
/**
|
||||
* Status of the async task execution.
|
||||
*
|
||||
* @return <code>true</code> if execution is completed or failed
|
||||
*/
|
||||
boolean isCompleted();
|
||||
|
||||
/**
|
||||
* Gets the value of completed async task.
|
||||
*
|
||||
* @return evaluated value or throws ExecutionException if execution has failed
|
||||
* @throws ExecutionException if execution has failed, containing the root cause
|
||||
* @throws IllegalStateException if execution is not completed
|
||||
*/
|
||||
T getValue() throws ExecutionException;
|
||||
|
||||
/**
|
||||
* Blocks the current thread until the async task is completed.
|
||||
*
|
||||
* @throws InterruptedException if the execution is interrupted
|
||||
*/
|
||||
void await() throws InterruptedException;
|
||||
}
|
@ -0,0 +1,125 @@
|
||||
package com.iluwatar.async.method.invocation;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Implementation of async executor that creates a new thread for every task.
|
||||
*/
|
||||
public class ThreadAsyncExecutor implements AsyncExecutor {
|
||||
|
||||
/** Index for thread naming */
|
||||
private final AtomicInteger idx = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public <T> AsyncResult<T> startProcess(Callable<T> task) {
|
||||
return startProcess(task, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
|
||||
CompletableResult<T> result = new CompletableResult<>(callback);
|
||||
new Thread(() -> {
|
||||
try {
|
||||
result.setValue(task.call());
|
||||
} catch (Exception ex) {
|
||||
result.setException(ex);
|
||||
}
|
||||
}, "executor-" + idx.incrementAndGet()).start();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
|
||||
if (asyncResult.isCompleted()) {
|
||||
return asyncResult.getValue();
|
||||
} else {
|
||||
asyncResult.await();
|
||||
return asyncResult.getValue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple implementation of async result that allows completing it successfully with a value
|
||||
* or exceptionally with an exception. A really simplified version from its real life cousins
|
||||
* FutureTask and CompletableFuture.
|
||||
*
|
||||
* @see java.util.concurrent.FutureTask
|
||||
* @see java.util.concurrent.CompletableFuture
|
||||
*/
|
||||
private static class CompletableResult<T> implements AsyncResult<T> {
|
||||
|
||||
static final int RUNNING = 1;
|
||||
static final int FAILED = 2;
|
||||
static final int COMPLETED = 3;
|
||||
|
||||
final Object lock;
|
||||
final Optional<AsyncCallback<T>> callback;
|
||||
|
||||
volatile int state = RUNNING;
|
||||
T value;
|
||||
Exception exception;
|
||||
|
||||
CompletableResult(AsyncCallback<T> callback) {
|
||||
this.lock = new Object();
|
||||
this.callback = Optional.ofNullable(callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value from successful execution and executes callback if available. Notifies
|
||||
* any thread waiting for completion.
|
||||
*
|
||||
* @param value value of the evaluated task
|
||||
*/
|
||||
void setValue(T value) {
|
||||
this.value = value;
|
||||
this.state = COMPLETED;
|
||||
this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the exception from failed execution and executes callback if available. Notifies
|
||||
* any thread waiting for completion.
|
||||
*
|
||||
* @param exception exception of the failed task
|
||||
*/
|
||||
void setException(Exception exception) {
|
||||
this.exception = exception;
|
||||
this.state = FAILED;
|
||||
this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCompleted() {
|
||||
return (state > RUNNING);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T getValue() throws ExecutionException {
|
||||
if (state == COMPLETED) {
|
||||
return value;
|
||||
} else if (state == FAILED) {
|
||||
throw new ExecutionException(exception);
|
||||
} else {
|
||||
throw new IllegalStateException("Execution not completed yet");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void await() throws InterruptedException {
|
||||
synchronized (lock) {
|
||||
if (!isCompleted()) {
|
||||
lock.wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
package com.iluwatar.async.method.invocation;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class AppTest {
|
||||
|
||||
@Test
|
||||
public void test() throws Exception {
|
||||
String[] args = {};
|
||||
App.main(args);
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user