Work on #403, removed dependency on async method invocation module, added more tests

This commit is contained in:
Narendra Pathai 2016-07-22 16:47:52 +05:30
parent 4bd1f14cfb
commit 2b945ca27f
5 changed files with 275 additions and 76 deletions

View File

@ -43,10 +43,5 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.iluwatar</groupId>
<artifactId>async-method-invocation</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,6 +1,9 @@
package com.iluwatar.promise; package com.iluwatar.promise;
import com.iluwatar.async.method.invocation.ThreadAsyncExecutor; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** /**
* *
@ -12,10 +15,19 @@ public class App {
* Program entry point * Program entry point
* @param args arguments * @param args arguments
* @throws InterruptedException if main thread is interruped. * @throws InterruptedException if main thread is interruped.
* @throws ExecutionException
*/ */
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadAsyncExecutor executor = new ThreadAsyncExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
try {
promiseUsage(executor);
} finally {
executor.shutdownNow();
}
}
private static void promiseUsage(Executor executor)
throws InterruptedException, ExecutionException {
Promise<Integer> consumedPromise = new Promise<>(); Promise<Integer> consumedPromise = new Promise<>();
consumedPromise.fulfillInAsync(() -> { consumedPromise.fulfillInAsync(() -> {
Thread.sleep(1000); Thread.sleep(1000);
@ -29,10 +41,10 @@ public class App {
Thread.sleep(1000); Thread.sleep(1000);
return "10"; return "10";
}, executor).then(value -> { return Integer.parseInt(value); }).then(value -> { }, executor).then(value -> { return Integer.parseInt(value); }).then(value -> {
System.out.println(value); System.out.println("Consumed transformed int value: " + value);
}); });
consumedPromise.await(); consumedPromise.get();
transformedPromise.await(); transformedPromise.get();
} }
} }

View File

@ -2,17 +2,18 @@ package com.iluwatar.promise;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import com.iluwatar.async.method.invocation.AsyncExecutor;
import com.iluwatar.async.method.invocation.internal.CompletableResult;
/** /**
* Implements the promise pattern. * Implements the promise pattern.
* @param <T> type of result. * @param <T> type of result.
*/ */
public class Promise<T> extends CompletableResult<T> { public class Promise<T> extends PromiseSupport<T> {
private Runnable fulfillmentAction; private Runnable fulfillmentAction;
@ -20,31 +21,30 @@ public class Promise<T> extends CompletableResult<T> {
* Creates a promise that will be fulfilled in future. * Creates a promise that will be fulfilled in future.
*/ */
public Promise() { public Promise() {
super(null);
} }
/** /**
* Fulfills the promise with the provided value. * Fulfills the promise with the provided value.
* @param value the fulfilled value that can be accessed using {@link #getValue()}. * @param value the fulfilled value that can be accessed using {@link #get()}.
*/ */
@Override @Override
public void setValue(T value) { public void fulfill(T value) {
super.setValue(value); super.fulfill(value);
postComplete(); postFulfillment();
} }
/** /**
* Fulfills the promise with exception due to error in execution. * Fulfills the promise with exception due to error in execution.
* @param exception the exception will be wrapped in {@link ExecutionException} * @param exception the exception will be wrapped in {@link ExecutionException}
* when accessing the value using {@link #getValue()}. * when accessing the value using {@link #get()}.
*/ */
@Override @Override
public void setException(Exception exception) { public void fulfillExceptionally(Exception exception) {
super.setException(exception); super.fulfillExceptionally(exception);
postComplete(); postFulfillment();
} }
void postComplete() { void postFulfillment() {
if (fulfillmentAction == null) { if (fulfillmentAction == null) {
return; return;
} }
@ -59,13 +59,12 @@ public class Promise<T> extends CompletableResult<T> {
* @param executor the executor in which the task should be run. * @param executor the executor in which the task should be run.
* @return a promise that represents the result of running the task provided. * @return a promise that represents the result of running the task provided.
*/ */
public Promise<T> fulfillInAsync(final Callable<T> task, AsyncExecutor executor) { public Promise<T> fulfillInAsync(final Callable<T> task, Executor executor) {
executor.startProcess(new Callable<Void>() { executor.execute(() -> {
try {
@Override fulfill(task.call());
public Void call() throws Exception { } catch (Exception e) {
setValue(task.call()); fulfillExceptionally(e);
return null;
} }
}); });
return this; return this;
@ -91,18 +90,22 @@ public class Promise<T> extends CompletableResult<T> {
*/ */
public <V> Promise<V> then(Function<? super T, V> func) { public <V> Promise<V> then(Function<? super T, V> func) {
Promise<V> dest = new Promise<>(); Promise<V> dest = new Promise<>();
fulfillmentAction = new FunctionAction<V>(this, dest, func); fulfillmentAction = new TransformAction<V>(this, dest, func);
return dest; return dest;
} }
/**
* A consume action provides the action, the value from source promise and fulfills the
* destination promise.
*/
private class ConsumeAction implements Runnable { private class ConsumeAction implements Runnable {
private Promise<T> current; private Promise<T> src;
private Promise<Void> dest; private Promise<Void> dest;
private Consumer<? super T> action; private Consumer<? super T> action;
public ConsumeAction(Promise<T> current, Promise<Void> dest, Consumer<? super T> action) { ConsumeAction(Promise<T> src, Promise<Void> dest, Consumer<? super T> action) {
this.current = current; this.src = src;
this.dest = dest; this.dest = dest;
this.action = action; this.action = action;
} }
@ -110,22 +113,26 @@ public class Promise<T> extends CompletableResult<T> {
@Override @Override
public void run() { public void run() {
try { try {
action.accept(current.getValue()); action.accept(src.get());
dest.setValue(null); dest.fulfill(null);
} catch (Throwable e) { } catch (Throwable e) {
dest.setException((Exception) e.getCause()); dest.fulfillExceptionally((Exception) e.getCause());
} }
} }
} }
private class FunctionAction<V> implements Runnable { /**
* A function action provides transformation function, value from source promise and fulfills the
* destination promise with the transformed value.
*/
private class TransformAction<V> implements Runnable {
private Promise<T> current; private Promise<T> src;
private Promise<V> dest; private Promise<V> dest;
private Function<? super T, V> func; private Function<? super T, V> func;
public FunctionAction(Promise<T> current, Promise<V> dest, Function<? super T, V> func) { TransformAction(Promise<T> src, Promise<V> dest, Function<? super T, V> func) {
this.current = current; this.src = src;
this.dest = dest; this.dest = dest;
this.func = func; this.func = func;
} }
@ -133,11 +140,103 @@ public class Promise<T> extends CompletableResult<T> {
@Override @Override
public void run() { public void run() {
try { try {
V result = func.apply(current.getValue()); V result = func.apply(src.get());
dest.setValue(result); dest.fulfill(result);
} catch (Throwable e) { } catch (Throwable e) {
dest.setException((Exception) e.getCause()); dest.fulfillExceptionally((Exception) e.getCause());
} }
} }
} }
} }
/**
* A really simplified implementation of future that allows completing it successfully with a value
* or exceptionally with an exception.
*/
class PromiseSupport<T> implements Future<T> {
static final int RUNNING = 1;
static final int FAILED = 2;
static final int COMPLETED = 3;
final Object lock;
volatile int state = RUNNING;
T value;
Exception exception;
PromiseSupport() {
this.lock = new Object();
}
void fulfill(T value) {
this.value = value;
this.state = COMPLETED;
synchronized (lock) {
lock.notifyAll();
}
}
void fulfillExceptionally(Exception exception) {
this.exception = exception;
this.state = FAILED;
synchronized (lock) {
lock.notifyAll();
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return state > RUNNING;
}
@Override
public T get() throws InterruptedException, ExecutionException {
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
synchronized (lock) {
lock.wait();
if (state == COMPLETED) {
return value;
} else {
throw new ExecutionException(exception);
}
}
}
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
synchronized (lock) {
lock.wait(unit.toMillis(timeout));
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
throw new TimeoutException();
}
}
}
}
}

View File

@ -1,5 +1,7 @@
package com.iluwatar.promise; package com.iluwatar.promise;
import java.util.concurrent.ExecutionException;
import org.junit.Test; import org.junit.Test;
/** /**
@ -9,7 +11,7 @@ import org.junit.Test;
public class AppTest { public class AppTest {
@Test @Test
public void testApp() throws InterruptedException { public void testApp() throws InterruptedException, ExecutionException {
App.main(null); App.main(null);
} }
} }

View File

@ -1,9 +1,16 @@
package com.iluwatar.promise; package com.iluwatar.promise;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -12,20 +19,18 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import com.iluwatar.async.method.invocation.ThreadAsyncExecutor;
/** /**
* Tests Promise class. * Tests Promise class.
*/ */
public class PromiseTest { public class PromiseTest {
private ThreadAsyncExecutor executor; private Executor executor;
private Promise<Integer> promise; private Promise<Integer> promise;
@Rule public ExpectedException exception = ExpectedException.none(); @Rule public ExpectedException exception = ExpectedException.none();
@Before @Before
public void setUp() { public void setUp() {
executor = new ThreadAsyncExecutor(); executor = Executors.newSingleThreadExecutor();
promise = new Promise<>(); promise = new Promise<>();
} }
@ -34,10 +39,70 @@ public class PromiseTest {
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException {
promise.fulfillInAsync(new NumberCrunchingTask(), executor); promise.fulfillInAsync(new NumberCrunchingTask(), executor);
// await fulfillment assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, promise.get());
promise.await(); assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
@Test
public void promiseIsFulfilledWithAnExceptionIfTaskThrowsAnException()
throws InterruptedException, ExecutionException, TimeoutException {
testWaitingForeverForPromiseToBeFulfilled();
testWaitingSomeTimeForPromiseToBeFulfilled();
}
assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, promise.getValue()); private void testWaitingForeverForPromiseToBeFulfilled() throws InterruptedException, TimeoutException {
Promise<Integer> promise = new Promise<>();
promise.fulfillInAsync(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw new RuntimeException("Barf!");
}}, executor);
try {
promise.get();
fail("Fetching promise should result in exception if the task threw an exception");
} catch (ExecutionException ex) {
assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
try {
promise.get(1000, TimeUnit.SECONDS);
fail("Fetching promise should result in exception if the task threw an exception");
} catch (ExecutionException ex) {
assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
}
private void testWaitingSomeTimeForPromiseToBeFulfilled()
throws InterruptedException, TimeoutException {
Promise<Integer> promise = new Promise<>();
promise.fulfillInAsync(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
throw new RuntimeException("Barf!");
}}, executor);
try {
promise.get(1000, TimeUnit.SECONDS);
fail("Fetching promise should result in exception if the task threw an exception");
} catch (ExecutionException ex) {
assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
try {
promise.get();
fail("Fetching promise should result in exception if the task threw an exception");
} catch (ExecutionException ex) {
assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
} }
@Test @Test
@ -50,13 +115,14 @@ public class PromiseTest {
}); });
// await fulfillment dependentPromise.get();
dependentPromise.await(); assertTrue(dependentPromise.isDone());
assertFalse(dependentPromise.isCancelled());
} }
@Test @Test
public void dependentPromiseIsFulfilledWithAnExceptionIfConsumerThrowsAnException() public void dependentPromiseIsFulfilledWithAnExceptionIfConsumerThrowsAnException()
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException, TimeoutException {
Promise<Void> dependentPromise = promise Promise<Void> dependentPromise = promise
.fulfillInAsync(new NumberCrunchingTask(), executor) .fulfillInAsync(new NumberCrunchingTask(), executor)
.then(new Consumer<Integer>() { .then(new Consumer<Integer>() {
@ -67,13 +133,21 @@ public class PromiseTest {
} }
}); });
try {
// await fulfillment dependentPromise.get();
dependentPromise.await(); fail("Fetching dependent promise should result in exception if the action threw an exception");
} catch (ExecutionException ex) {
exception.expect(ExecutionException.class); assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
dependentPromise.getValue(); }
try {
dependentPromise.get(1000, TimeUnit.SECONDS);
fail("Fetching dependent promise should result in exception if the action threw an exception");
} catch (ExecutionException ex) {
assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
} }
@Test @Test
@ -87,15 +161,14 @@ public class PromiseTest {
}); });
// await fulfillment assertEquals(String.valueOf(NumberCrunchingTask.CRUNCHED_NUMBER), dependentPromise.get());
dependentPromise.await(); assertTrue(dependentPromise.isDone());
assertFalse(dependentPromise.isCancelled());
assertEquals(String.valueOf(NumberCrunchingTask.CRUNCHED_NUMBER), dependentPromise.getValue());
} }
@Test @Test
public void dependentPromiseIsFulfilledWithAnExceptionIfTheFunctionThrowsException() public void dependentPromiseIsFulfilledWithAnExceptionIfTheFunctionThrowsException()
throws InterruptedException, ExecutionException { throws InterruptedException, ExecutionException, TimeoutException {
Promise<String> dependentPromise = promise Promise<String> dependentPromise = promise
.fulfillInAsync(new NumberCrunchingTask(), executor) .fulfillInAsync(new NumberCrunchingTask(), executor)
.then(new Function<Integer, String>() { .then(new Function<Integer, String>() {
@ -106,12 +179,30 @@ public class PromiseTest {
} }
}); });
// await fulfillment try {
dependentPromise.await(); dependentPromise.get();
fail("Fetching dependent promise should result in exception if the function threw an exception");
exception.expect(ExecutionException.class); } catch (ExecutionException ex) {
assertTrue(promise.isDone());
dependentPromise.getValue(); assertFalse(promise.isCancelled());
}
try {
dependentPromise.get(1000, TimeUnit.SECONDS);
fail("Fetching dependent promise should result in exception if the function threw an exception");
} catch (ExecutionException ex) {
assertTrue(promise.isDone());
assertFalse(promise.isCancelled());
}
}
@Test
public void fetchingAnAlreadyFulfilledPromiseReturnsTheFulfilledValueImmediately()
throws InterruptedException, ExecutionException, TimeoutException {
Promise<Integer> promise = new Promise<>();
promise.fulfill(NumberCrunchingTask.CRUNCHED_NUMBER);
promise.get(1000, TimeUnit.SECONDS);
} }
private static class NumberCrunchingTask implements Callable<Integer> { private static class NumberCrunchingTask implements Callable<Integer> {