diff --git a/promise/pom.xml b/promise/pom.xml index f5727b951..ca12515ee 100644 --- a/promise/pom.xml +++ b/promise/pom.xml @@ -43,10 +43,5 @@ mockito-core test - - com.iluwatar - async-method-invocation - 1.13.0-SNAPSHOT - diff --git a/promise/src/main/java/com/iluwatar/promise/App.java b/promise/src/main/java/com/iluwatar/promise/App.java index 5817e68da..f9e089f3d 100644 --- a/promise/src/main/java/com/iluwatar/promise/App.java +++ b/promise/src/main/java/com/iluwatar/promise/App.java @@ -1,6 +1,9 @@ package com.iluwatar.promise; -import com.iluwatar.async.method.invocation.ThreadAsyncExecutor; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @@ -12,10 +15,19 @@ public class App { * Program entry point * @param args arguments * @throws InterruptedException if main thread is interruped. + * @throws ExecutionException */ - public static void main(String[] args) throws InterruptedException { - ThreadAsyncExecutor executor = new ThreadAsyncExecutor(); - + public static void main(String[] args) throws InterruptedException, ExecutionException { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + promiseUsage(executor); + } finally { + executor.shutdownNow(); + } + } + + private static void promiseUsage(Executor executor) + throws InterruptedException, ExecutionException { Promise consumedPromise = new Promise<>(); consumedPromise.fulfillInAsync(() -> { Thread.sleep(1000); @@ -29,10 +41,10 @@ public class App { Thread.sleep(1000); return "10"; }, executor).then(value -> { return Integer.parseInt(value); }).then(value -> { - System.out.println(value); + System.out.println("Consumed transformed int value: " + value); }); - consumedPromise.await(); - transformedPromise.await(); + consumedPromise.get(); + transformedPromise.get(); } } diff --git a/promise/src/main/java/com/iluwatar/promise/Promise.java b/promise/src/main/java/com/iluwatar/promise/Promise.java index 0bc4accbb..991c2a05c 100644 --- a/promise/src/main/java/com/iluwatar/promise/Promise.java +++ b/promise/src/main/java/com/iluwatar/promise/Promise.java @@ -2,17 +2,18 @@ package com.iluwatar.promise; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; -import com.iluwatar.async.method.invocation.AsyncExecutor; -import com.iluwatar.async.method.invocation.internal.CompletableResult; - /** * Implements the promise pattern. * @param type of result. */ -public class Promise extends CompletableResult { +public class Promise extends PromiseSupport { private Runnable fulfillmentAction; @@ -20,31 +21,30 @@ public class Promise extends CompletableResult { * Creates a promise that will be fulfilled in future. */ public Promise() { - super(null); } /** * Fulfills the promise with the provided value. - * @param value the fulfilled value that can be accessed using {@link #getValue()}. + * @param value the fulfilled value that can be accessed using {@link #get()}. */ @Override - public void setValue(T value) { - super.setValue(value); - postComplete(); + public void fulfill(T value) { + super.fulfill(value); + postFulfillment(); } /** * Fulfills the promise with exception due to error in execution. * @param exception the exception will be wrapped in {@link ExecutionException} - * when accessing the value using {@link #getValue()}. + * when accessing the value using {@link #get()}. */ @Override - public void setException(Exception exception) { - super.setException(exception); - postComplete(); + public void fulfillExceptionally(Exception exception) { + super.fulfillExceptionally(exception); + postFulfillment(); } - void postComplete() { + void postFulfillment() { if (fulfillmentAction == null) { return; } @@ -59,13 +59,12 @@ public class Promise extends CompletableResult { * @param executor the executor in which the task should be run. * @return a promise that represents the result of running the task provided. */ - public Promise fulfillInAsync(final Callable task, AsyncExecutor executor) { - executor.startProcess(new Callable() { - - @Override - public Void call() throws Exception { - setValue(task.call()); - return null; + public Promise fulfillInAsync(final Callable task, Executor executor) { + executor.execute(() -> { + try { + fulfill(task.call()); + } catch (Exception e) { + fulfillExceptionally(e); } }); return this; @@ -91,18 +90,22 @@ public class Promise extends CompletableResult { */ public Promise then(Function func) { Promise dest = new Promise<>(); - fulfillmentAction = new FunctionAction(this, dest, func); + fulfillmentAction = new TransformAction(this, dest, func); return dest; } + /** + * A consume action provides the action, the value from source promise and fulfills the + * destination promise. + */ private class ConsumeAction implements Runnable { - private Promise current; + private Promise src; private Promise dest; private Consumer action; - public ConsumeAction(Promise current, Promise dest, Consumer action) { - this.current = current; + ConsumeAction(Promise src, Promise dest, Consumer action) { + this.src = src; this.dest = dest; this.action = action; } @@ -110,22 +113,26 @@ public class Promise extends CompletableResult { @Override public void run() { try { - action.accept(current.getValue()); - dest.setValue(null); + action.accept(src.get()); + dest.fulfill(null); } catch (Throwable e) { - dest.setException((Exception) e.getCause()); + dest.fulfillExceptionally((Exception) e.getCause()); } } } - private class FunctionAction implements Runnable { + /** + * A function action provides transformation function, value from source promise and fulfills the + * destination promise with the transformed value. + */ + private class TransformAction implements Runnable { - private Promise current; + private Promise src; private Promise dest; private Function func; - public FunctionAction(Promise current, Promise dest, Function func) { - this.current = current; + TransformAction(Promise src, Promise dest, Function func) { + this.src = src; this.dest = dest; this.func = func; } @@ -133,11 +140,103 @@ public class Promise extends CompletableResult { @Override public void run() { try { - V result = func.apply(current.getValue()); - dest.setValue(result); + V result = func.apply(src.get()); + dest.fulfill(result); } catch (Throwable e) { - dest.setException((Exception) e.getCause()); + dest.fulfillExceptionally((Exception) e.getCause()); } } } } + + +/** + * A really simplified implementation of future that allows completing it successfully with a value + * or exceptionally with an exception. + */ +class PromiseSupport implements Future { + + static final int RUNNING = 1; + static final int FAILED = 2; + static final int COMPLETED = 3; + + final Object lock; + + volatile int state = RUNNING; + T value; + Exception exception; + + PromiseSupport() { + this.lock = new Object(); + } + + void fulfill(T value) { + this.value = value; + this.state = COMPLETED; + synchronized (lock) { + lock.notifyAll(); + } + } + + void fulfillExceptionally(Exception exception) { + this.exception = exception; + this.state = FAILED; + synchronized (lock) { + lock.notifyAll(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return state > RUNNING; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + if (state == COMPLETED) { + return value; + } else if (state == FAILED) { + throw new ExecutionException(exception); + } else { + synchronized (lock) { + lock.wait(); + if (state == COMPLETED) { + return value; + } else { + throw new ExecutionException(exception); + } + } + } + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (state == COMPLETED) { + return value; + } else if (state == FAILED) { + throw new ExecutionException(exception); + } else { + synchronized (lock) { + lock.wait(unit.toMillis(timeout)); + if (state == COMPLETED) { + return value; + } else if (state == FAILED) { + throw new ExecutionException(exception); + } else { + throw new TimeoutException(); + } + } + } + } +} \ No newline at end of file diff --git a/promise/src/test/java/com/iluwatar/promise/AppTest.java b/promise/src/test/java/com/iluwatar/promise/AppTest.java index b59187cb1..b2628127c 100644 --- a/promise/src/test/java/com/iluwatar/promise/AppTest.java +++ b/promise/src/test/java/com/iluwatar/promise/AppTest.java @@ -1,5 +1,7 @@ package com.iluwatar.promise; +import java.util.concurrent.ExecutionException; + import org.junit.Test; /** @@ -9,7 +11,7 @@ import org.junit.Test; public class AppTest { @Test - public void testApp() throws InterruptedException { + public void testApp() throws InterruptedException, ExecutionException { App.main(null); } } diff --git a/promise/src/test/java/com/iluwatar/promise/PromiseTest.java b/promise/src/test/java/com/iluwatar/promise/PromiseTest.java index 9c28be1b3..c64b82d06 100644 --- a/promise/src/test/java/com/iluwatar/promise/PromiseTest.java +++ b/promise/src/test/java/com/iluwatar/promise/PromiseTest.java @@ -1,9 +1,16 @@ package com.iluwatar.promise; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; @@ -12,20 +19,18 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import com.iluwatar.async.method.invocation.ThreadAsyncExecutor; - /** * Tests Promise class. */ public class PromiseTest { - private ThreadAsyncExecutor executor; + private Executor executor; private Promise promise; @Rule public ExpectedException exception = ExpectedException.none(); @Before public void setUp() { - executor = new ThreadAsyncExecutor(); + executor = Executors.newSingleThreadExecutor(); promise = new Promise<>(); } @@ -34,10 +39,70 @@ public class PromiseTest { throws InterruptedException, ExecutionException { promise.fulfillInAsync(new NumberCrunchingTask(), executor); - // await fulfillment - promise.await(); + assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, promise.get()); + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + + @Test + public void promiseIsFulfilledWithAnExceptionIfTaskThrowsAnException() + throws InterruptedException, ExecutionException, TimeoutException { + testWaitingForeverForPromiseToBeFulfilled(); + testWaitingSomeTimeForPromiseToBeFulfilled(); + } - assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, promise.getValue()); + private void testWaitingForeverForPromiseToBeFulfilled() throws InterruptedException, TimeoutException { + Promise promise = new Promise<>(); + promise.fulfillInAsync(new Callable() { + + @Override + public Integer call() throws Exception { + throw new RuntimeException("Barf!"); + }}, executor); + + try { + promise.get(); + fail("Fetching promise should result in exception if the task threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + + try { + promise.get(1000, TimeUnit.SECONDS); + fail("Fetching promise should result in exception if the task threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + } + + private void testWaitingSomeTimeForPromiseToBeFulfilled() + throws InterruptedException, TimeoutException { + Promise promise = new Promise<>(); + promise.fulfillInAsync(new Callable() { + + @Override + public Integer call() throws Exception { + throw new RuntimeException("Barf!"); + }}, executor); + + try { + promise.get(1000, TimeUnit.SECONDS); + fail("Fetching promise should result in exception if the task threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + + try { + promise.get(); + fail("Fetching promise should result in exception if the task threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + } @Test @@ -50,13 +115,14 @@ public class PromiseTest { }); - // await fulfillment - dependentPromise.await(); + dependentPromise.get(); + assertTrue(dependentPromise.isDone()); + assertFalse(dependentPromise.isCancelled()); } @Test public void dependentPromiseIsFulfilledWithAnExceptionIfConsumerThrowsAnException() - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, TimeoutException { Promise dependentPromise = promise .fulfillInAsync(new NumberCrunchingTask(), executor) .then(new Consumer() { @@ -67,13 +133,21 @@ public class PromiseTest { } }); - - // await fulfillment - dependentPromise.await(); - - exception.expect(ExecutionException.class); - - dependentPromise.getValue(); + try { + dependentPromise.get(); + fail("Fetching dependent promise should result in exception if the action threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + + try { + dependentPromise.get(1000, TimeUnit.SECONDS); + fail("Fetching dependent promise should result in exception if the action threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } } @Test @@ -87,15 +161,14 @@ public class PromiseTest { }); - // await fulfillment - dependentPromise.await(); - - assertEquals(String.valueOf(NumberCrunchingTask.CRUNCHED_NUMBER), dependentPromise.getValue()); + assertEquals(String.valueOf(NumberCrunchingTask.CRUNCHED_NUMBER), dependentPromise.get()); + assertTrue(dependentPromise.isDone()); + assertFalse(dependentPromise.isCancelled()); } @Test public void dependentPromiseIsFulfilledWithAnExceptionIfTheFunctionThrowsException() - throws InterruptedException, ExecutionException { + throws InterruptedException, ExecutionException, TimeoutException { Promise dependentPromise = promise .fulfillInAsync(new NumberCrunchingTask(), executor) .then(new Function() { @@ -106,12 +179,30 @@ public class PromiseTest { } }); - // await fulfillment - dependentPromise.await(); - - exception.expect(ExecutionException.class); - - dependentPromise.getValue(); + try { + dependentPromise.get(); + fail("Fetching dependent promise should result in exception if the function threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + + try { + dependentPromise.get(1000, TimeUnit.SECONDS); + fail("Fetching dependent promise should result in exception if the function threw an exception"); + } catch (ExecutionException ex) { + assertTrue(promise.isDone()); + assertFalse(promise.isCancelled()); + } + } + + @Test + public void fetchingAnAlreadyFulfilledPromiseReturnsTheFulfilledValueImmediately() + throws InterruptedException, ExecutionException, TimeoutException { + Promise promise = new Promise<>(); + promise.fulfill(NumberCrunchingTask.CRUNCHED_NUMBER); + + promise.get(1000, TimeUnit.SECONDS); } private static class NumberCrunchingTask implements Callable {