diff --git a/pom.xml b/pom.xml index 6f3e0d698..45d574e73 100644 --- a/pom.xml +++ b/pom.xml @@ -128,6 +128,7 @@ hexagonal abstract-document aggregator-microservices + promise page-object diff --git a/promise/pom.xml b/promise/pom.xml index ca12515ee..f5727b951 100644 --- a/promise/pom.xml +++ b/promise/pom.xml @@ -43,5 +43,10 @@ mockito-core test + + com.iluwatar + async-method-invocation + 1.13.0-SNAPSHOT + diff --git a/promise/src/main/java/com/iluwatar/promise/App.java b/promise/src/main/java/com/iluwatar/promise/App.java index dc22c307a..5817e68da 100644 --- a/promise/src/main/java/com/iluwatar/promise/App.java +++ b/promise/src/main/java/com/iluwatar/promise/App.java @@ -1,24 +1,38 @@ package com.iluwatar.promise; +import com.iluwatar.async.method.invocation.ThreadAsyncExecutor; + +/** + * + * Application that uses promise pattern. + */ public class App { /** * Program entry point * @param args arguments + * @throws InterruptedException if main thread is interruped. */ - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { ThreadAsyncExecutor executor = new ThreadAsyncExecutor(); - executor.execute(() -> { + + Promise consumedPromise = new Promise<>(); + consumedPromise.fulfillInAsync(() -> { Thread.sleep(1000); return 10; - }).then(value -> {System.out.println("Consumed the value: " + value);}) - .then(nullVal -> {System.out.println("Post consuming value");}); + }, executor).then(value -> { + System.out.println("Consumed int value: " + value); + }); - - executor.execute(() -> { + Promise transformedPromise = new Promise<>(); + transformedPromise.fulfillInAsync(() -> { Thread.sleep(1000); return "10"; - }).then(value -> {return 10 + Integer.parseInt(value);}) - .then(intValue -> {System.out.println("Consumed int value: " + intValue);}); + }, executor).then(value -> { return Integer.parseInt(value); }).then(value -> { + System.out.println(value); + }); + + consumedPromise.await(); + transformedPromise.await(); } } diff --git a/promise/src/main/java/com/iluwatar/promise/ListenableAsyncResult.java b/promise/src/main/java/com/iluwatar/promise/ListenableAsyncResult.java deleted file mode 100644 index a68154e17..000000000 --- a/promise/src/main/java/com/iluwatar/promise/ListenableAsyncResult.java +++ /dev/null @@ -1,12 +0,0 @@ -package com.iluwatar.promise; - -import java.util.function.Consumer; -import java.util.function.Function; - -import com.iluwatar.async.method.invocation.AsyncResult; - -public interface ListenableAsyncResult extends AsyncResult { - ListenableAsyncResult then(Consumer action); - ListenableAsyncResult then(Function func); - ListenableAsyncResult error(Consumer action); -} diff --git a/promise/src/main/java/com/iluwatar/promise/Promise.java b/promise/src/main/java/com/iluwatar/promise/Promise.java new file mode 100644 index 000000000..0bc4accbb --- /dev/null +++ b/promise/src/main/java/com/iluwatar/promise/Promise.java @@ -0,0 +1,143 @@ +package com.iluwatar.promise; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Function; + +import com.iluwatar.async.method.invocation.AsyncExecutor; +import com.iluwatar.async.method.invocation.internal.CompletableResult; + +/** + * Implements the promise pattern. + * @param type of result. + */ +public class Promise extends CompletableResult { + + private Runnable fulfillmentAction; + + /** + * Creates a promise that will be fulfilled in future. + */ + public Promise() { + super(null); + } + + /** + * Fulfills the promise with the provided value. + * @param value the fulfilled value that can be accessed using {@link #getValue()}. + */ + @Override + public void setValue(T value) { + super.setValue(value); + postComplete(); + } + + /** + * Fulfills the promise with exception due to error in execution. + * @param exception the exception will be wrapped in {@link ExecutionException} + * when accessing the value using {@link #getValue()}. + */ + @Override + public void setException(Exception exception) { + super.setException(exception); + postComplete(); + } + + void postComplete() { + if (fulfillmentAction == null) { + return; + } + fulfillmentAction.run(); + } + + /** + * Executes the task using the executor in other thread and fulfills the promise returned + * once the task completes either successfully or with an exception. + * + * @param task the task that will provide the value to fulfill the promise. + * @param executor the executor in which the task should be run. + * @return a promise that represents the result of running the task provided. + */ + public Promise fulfillInAsync(final Callable task, AsyncExecutor executor) { + executor.startProcess(new Callable() { + + @Override + public Void call() throws Exception { + setValue(task.call()); + return null; + } + }); + return this; + } + + /** + * Returns a new promise that, when this promise is fulfilled normally, is fulfilled with + * result of this promise as argument to the action provided. + * @param action action to be executed. + * @return a new promise. + */ + public Promise then(Consumer action) { + Promise dest = new Promise<>(); + fulfillmentAction = new ConsumeAction(this, dest, action); + return dest; + } + + /** + * Returns a new promise that, when this promise is fulfilled normally, is fulfilled with + * result of this promise as argument to the function provided. + * @param func function to be executed. + * @return a new promise. + */ + public Promise then(Function func) { + Promise dest = new Promise<>(); + fulfillmentAction = new FunctionAction(this, dest, func); + return dest; + } + + private class ConsumeAction implements Runnable { + + private Promise current; + private Promise dest; + private Consumer action; + + public ConsumeAction(Promise current, Promise dest, Consumer action) { + this.current = current; + this.dest = dest; + this.action = action; + } + + @Override + public void run() { + try { + action.accept(current.getValue()); + dest.setValue(null); + } catch (Throwable e) { + dest.setException((Exception) e.getCause()); + } + } + } + + private class FunctionAction implements Runnable { + + private Promise current; + private Promise dest; + private Function func; + + public FunctionAction(Promise current, Promise dest, Function func) { + this.current = current; + this.dest = dest; + this.func = func; + } + + @Override + public void run() { + try { + V result = func.apply(current.getValue()); + dest.setValue(result); + } catch (Throwable e) { + dest.setException((Exception) e.getCause()); + } + } + } +} diff --git a/promise/src/main/java/com/iluwatar/promise/PromiseAsyncExecutor.java b/promise/src/main/java/com/iluwatar/promise/PromiseAsyncExecutor.java deleted file mode 100644 index eb43b0546..000000000 --- a/promise/src/main/java/com/iluwatar/promise/PromiseAsyncExecutor.java +++ /dev/null @@ -1,7 +0,0 @@ -package com.iluwatar.promise; - -import java.util.concurrent.Callable; - -public interface PromiseAsyncExecutor { - ListenableAsyncResult execute(Callable task); -} diff --git a/promise/src/main/java/com/iluwatar/promise/ThreadAsyncExecutor.java b/promise/src/main/java/com/iluwatar/promise/ThreadAsyncExecutor.java deleted file mode 100644 index aa057d676..000000000 --- a/promise/src/main/java/com/iluwatar/promise/ThreadAsyncExecutor.java +++ /dev/null @@ -1,173 +0,0 @@ -package com.iluwatar.promise; - -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import java.util.function.Function; - -public class ThreadAsyncExecutor implements PromiseAsyncExecutor { - - /** Index for thread naming */ - private final AtomicInteger idx = new AtomicInteger(0); - - @Override - public ListenableAsyncResult execute(Callable task) { - Promise promise = new Promise<>(); - new Thread(() -> { - try { - promise.setValue(task.call()); - promise.postComplete(); - } catch (Exception ex) { - promise.setException(ex); - } - } , "executor-" + idx.incrementAndGet()).start(); - return promise; - } - - // TODO there is scope of extending the completable future from async method invocation project. Do that. - private class Promise implements ListenableAsyncResult { - - static final int RUNNING = 1; - static final int FAILED = 2; - static final int COMPLETED = 3; - - final Object lock; - volatile int state = RUNNING; - T value; - Exception exception; - Runnable fulfilmentAction; - - public Promise() { - this.lock = new Object(); - } - - void postComplete() { - fulfilmentAction.run(); - } - - /** - * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for - * completion. - * - * @param value - * value of the evaluated task - */ - public void setValue(T value) { - this.value = value; - this.state = COMPLETED; - synchronized (lock) { - lock.notifyAll(); - } - } - - /** - * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for - * completion. - * - * @param exception - * exception of the failed task - */ - public void setException(Exception exception) { - this.exception = exception; - this.state = FAILED; - synchronized (lock) { - lock.notifyAll(); - } - } - - @Override - public boolean isCompleted() { - return state > RUNNING; - } - - @Override - public T getValue() throws ExecutionException { - if (state == COMPLETED) { - return value; - } else if (state == FAILED) { - throw new ExecutionException(exception); - } else { - throw new IllegalStateException("Execution not completed yet"); - } - } - - @Override - public void await() throws InterruptedException { - synchronized (lock) { - if (!isCompleted()) { - lock.wait(); - } - } - } - - @Override - public ListenableAsyncResult then(Consumer action) { - Promise dest = new Promise<>(); - fulfilmentAction = new ConsumeAction(this, dest, action); - return dest; - } - - @Override - public ListenableAsyncResult then(Function func) { - Promise dest = new Promise<>(); - fulfilmentAction = new FunctionAction(this, dest, func); - return dest; - } - - @Override - public ListenableAsyncResult error(Consumer action) { - return null; - } - - private class ConsumeAction implements Runnable { - - private Promise current; - private Promise dest; - private Consumer action; - - public ConsumeAction(Promise current, Promise dest, Consumer action) { - this.current = current; - this.dest = dest; - this.action = action; - } - - @Override - public void run() { - try { - action.accept(current.getValue()); - dest.setValue(null); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - dest.postComplete(); - } - } - - private class FunctionAction implements Runnable { - - private Promise current; - private Promise dest; - private Function func; - - public FunctionAction(Promise current, Promise dest, Function func) { - this.current = current; - this.dest = dest; - this.func = func; - } - - @Override - public void run() { - try { - V result = func.apply(current.getValue()); - dest.setValue(result); - } catch (ExecutionException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - dest.postComplete(); - } - } - } - } \ No newline at end of file diff --git a/promise/src/test/java/com/iluwatar/promise/AppTest.java b/promise/src/test/java/com/iluwatar/promise/AppTest.java new file mode 100644 index 000000000..b59187cb1 --- /dev/null +++ b/promise/src/test/java/com/iluwatar/promise/AppTest.java @@ -0,0 +1,15 @@ +package com.iluwatar.promise; + +import org.junit.Test; + +/** + * + * Application test. + */ +public class AppTest { + + @Test + public void testApp() throws InterruptedException { + App.main(null); + } +} diff --git a/promise/src/test/java/com/iluwatar/promise/PromiseTest.java b/promise/src/test/java/com/iluwatar/promise/PromiseTest.java new file mode 100644 index 000000000..9c28be1b3 --- /dev/null +++ b/promise/src/test/java/com/iluwatar/promise/PromiseTest.java @@ -0,0 +1,128 @@ +package com.iluwatar.promise; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; +import java.util.function.Function; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import com.iluwatar.async.method.invocation.ThreadAsyncExecutor; + +/** + * Tests Promise class. + */ +public class PromiseTest { + + private ThreadAsyncExecutor executor; + private Promise promise; + @Rule public ExpectedException exception = ExpectedException.none(); + + @Before + public void setUp() { + executor = new ThreadAsyncExecutor(); + promise = new Promise<>(); + } + + @Test + public void promiseIsFulfilledWithTheResultantValueOfExecutingTheTask() + throws InterruptedException, ExecutionException { + promise.fulfillInAsync(new NumberCrunchingTask(), executor); + + // await fulfillment + promise.await(); + + assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, promise.getValue()); + } + + @Test + public void dependentPromiseIsFulfilledAfterTheConsumerConsumesTheResultOfThisPromise() + throws InterruptedException, ExecutionException { + Promise dependentPromise = promise + .fulfillInAsync(new NumberCrunchingTask(), executor) + .then(value -> { + assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, value); + }); + + + // await fulfillment + dependentPromise.await(); + } + + @Test + public void dependentPromiseIsFulfilledWithAnExceptionIfConsumerThrowsAnException() + throws InterruptedException, ExecutionException { + Promise dependentPromise = promise + .fulfillInAsync(new NumberCrunchingTask(), executor) + .then(new Consumer() { + + @Override + public void accept(Integer t) { + throw new RuntimeException("Barf!"); + } + }); + + + // await fulfillment + dependentPromise.await(); + + exception.expect(ExecutionException.class); + + dependentPromise.getValue(); + } + + @Test + public void dependentPromiseIsFulfilledAfterTheFunctionTransformsTheResultOfThisPromise() + throws InterruptedException, ExecutionException { + Promise dependentPromise = promise + .fulfillInAsync(new NumberCrunchingTask(), executor) + .then(value -> { + assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, value); + return String.valueOf(value); + }); + + + // await fulfillment + dependentPromise.await(); + + assertEquals(String.valueOf(NumberCrunchingTask.CRUNCHED_NUMBER), dependentPromise.getValue()); + } + + @Test + public void dependentPromiseIsFulfilledWithAnExceptionIfTheFunctionThrowsException() + throws InterruptedException, ExecutionException { + Promise dependentPromise = promise + .fulfillInAsync(new NumberCrunchingTask(), executor) + .then(new Function() { + + @Override + public String apply(Integer t) { + throw new RuntimeException("Barf!"); + } + }); + + // await fulfillment + dependentPromise.await(); + + exception.expect(ExecutionException.class); + + dependentPromise.getValue(); + } + + private static class NumberCrunchingTask implements Callable { + + private static final Integer CRUNCHED_NUMBER = Integer.MAX_VALUE; + + @Override + public Integer call() throws Exception { + // Do number crunching + Thread.sleep(1000); + return CRUNCHED_NUMBER; + } + } +}