Work on #403, added application class and test cases.

This commit is contained in:
Narendra Pathai 2016-07-21 19:13:42 +05:30
parent ea7503414e
commit 102341443d
9 changed files with 314 additions and 200 deletions

View File

@ -128,6 +128,7 @@
<module>hexagonal</module> <module>hexagonal</module>
<module>abstract-document</module> <module>abstract-document</module>
<module>aggregator-microservices</module> <module>aggregator-microservices</module>
<module>promise</module>
<module>page-object</module> <module>page-object</module>
</modules> </modules>

View File

@ -43,5 +43,10 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.iluwatar</groupId>
<artifactId>async-method-invocation</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -1,24 +1,38 @@
package com.iluwatar.promise; package com.iluwatar.promise;
import com.iluwatar.async.method.invocation.ThreadAsyncExecutor;
/**
*
* Application that uses promise pattern.
*/
public class App { public class App {
/** /**
* Program entry point * Program entry point
* @param args arguments * @param args arguments
* @throws InterruptedException if main thread is interruped.
*/ */
public static void main(String[] args) { public static void main(String[] args) throws InterruptedException {
ThreadAsyncExecutor executor = new ThreadAsyncExecutor(); ThreadAsyncExecutor executor = new ThreadAsyncExecutor();
executor.execute(() -> {
Promise<Integer> consumedPromise = new Promise<>();
consumedPromise.fulfillInAsync(() -> {
Thread.sleep(1000); Thread.sleep(1000);
return 10; return 10;
}).then(value -> {System.out.println("Consumed the value: " + value);}) }, executor).then(value -> {
.then(nullVal -> {System.out.println("Post consuming value");}); System.out.println("Consumed int value: " + value);
});
Promise<String> transformedPromise = new Promise<>();
executor.execute(() -> { transformedPromise.fulfillInAsync(() -> {
Thread.sleep(1000); Thread.sleep(1000);
return "10"; return "10";
}).then(value -> {return 10 + Integer.parseInt(value);}) }, executor).then(value -> { return Integer.parseInt(value); }).then(value -> {
.then(intValue -> {System.out.println("Consumed int value: " + intValue);}); System.out.println(value);
});
consumedPromise.await();
transformedPromise.await();
} }
} }

View File

@ -1,12 +0,0 @@
package com.iluwatar.promise;
import java.util.function.Consumer;
import java.util.function.Function;
import com.iluwatar.async.method.invocation.AsyncResult;
public interface ListenableAsyncResult<T> extends AsyncResult<T> {
ListenableAsyncResult<Void> then(Consumer<? super T> action);
<V> ListenableAsyncResult<V> then(Function<? super T, V> func);
ListenableAsyncResult<T> error(Consumer<? extends Throwable> action);
}

View File

@ -0,0 +1,143 @@
package com.iluwatar.promise;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import com.iluwatar.async.method.invocation.AsyncExecutor;
import com.iluwatar.async.method.invocation.internal.CompletableResult;
/**
* Implements the promise pattern.
* @param <T> type of result.
*/
public class Promise<T> extends CompletableResult<T> {
private Runnable fulfillmentAction;
/**
* Creates a promise that will be fulfilled in future.
*/
public Promise() {
super(null);
}
/**
* Fulfills the promise with the provided value.
* @param value the fulfilled value that can be accessed using {@link #getValue()}.
*/
@Override
public void setValue(T value) {
super.setValue(value);
postComplete();
}
/**
* Fulfills the promise with exception due to error in execution.
* @param exception the exception will be wrapped in {@link ExecutionException}
* when accessing the value using {@link #getValue()}.
*/
@Override
public void setException(Exception exception) {
super.setException(exception);
postComplete();
}
void postComplete() {
if (fulfillmentAction == null) {
return;
}
fulfillmentAction.run();
}
/**
* Executes the task using the executor in other thread and fulfills the promise returned
* once the task completes either successfully or with an exception.
*
* @param task the task that will provide the value to fulfill the promise.
* @param executor the executor in which the task should be run.
* @return a promise that represents the result of running the task provided.
*/
public Promise<T> fulfillInAsync(final Callable<T> task, AsyncExecutor executor) {
executor.startProcess(new Callable<Void>() {
@Override
public Void call() throws Exception {
setValue(task.call());
return null;
}
});
return this;
}
/**
* Returns a new promise that, when this promise is fulfilled normally, is fulfilled with
* result of this promise as argument to the action provided.
* @param action action to be executed.
* @return a new promise.
*/
public Promise<Void> then(Consumer<? super T> action) {
Promise<Void> dest = new Promise<>();
fulfillmentAction = new ConsumeAction(this, dest, action);
return dest;
}
/**
* Returns a new promise that, when this promise is fulfilled normally, is fulfilled with
* result of this promise as argument to the function provided.
* @param func function to be executed.
* @return a new promise.
*/
public <V> Promise<V> then(Function<? super T, V> func) {
Promise<V> dest = new Promise<>();
fulfillmentAction = new FunctionAction<V>(this, dest, func);
return dest;
}
private class ConsumeAction implements Runnable {
private Promise<T> current;
private Promise<Void> dest;
private Consumer<? super T> action;
public ConsumeAction(Promise<T> current, Promise<Void> dest, Consumer<? super T> action) {
this.current = current;
this.dest = dest;
this.action = action;
}
@Override
public void run() {
try {
action.accept(current.getValue());
dest.setValue(null);
} catch (Throwable e) {
dest.setException((Exception) e.getCause());
}
}
}
private class FunctionAction<V> implements Runnable {
private Promise<T> current;
private Promise<V> dest;
private Function<? super T, V> func;
public FunctionAction(Promise<T> current, Promise<V> dest, Function<? super T, V> func) {
this.current = current;
this.dest = dest;
this.func = func;
}
@Override
public void run() {
try {
V result = func.apply(current.getValue());
dest.setValue(result);
} catch (Throwable e) {
dest.setException((Exception) e.getCause());
}
}
}
}

View File

@ -1,7 +0,0 @@
package com.iluwatar.promise;
import java.util.concurrent.Callable;
public interface PromiseAsyncExecutor {
<T> ListenableAsyncResult<T> execute(Callable<T> task);
}

View File

@ -1,173 +0,0 @@
package com.iluwatar.promise;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
public class ThreadAsyncExecutor implements PromiseAsyncExecutor {
/** Index for thread naming */
private final AtomicInteger idx = new AtomicInteger(0);
@Override
public <T> ListenableAsyncResult<T> execute(Callable<T> task) {
Promise<T> promise = new Promise<>();
new Thread(() -> {
try {
promise.setValue(task.call());
promise.postComplete();
} catch (Exception ex) {
promise.setException(ex);
}
} , "executor-" + idx.incrementAndGet()).start();
return promise;
}
// TODO there is scope of extending the completable future from async method invocation project. Do that.
private class Promise<T> implements ListenableAsyncResult<T> {
static final int RUNNING = 1;
static final int FAILED = 2;
static final int COMPLETED = 3;
final Object lock;
volatile int state = RUNNING;
T value;
Exception exception;
Runnable fulfilmentAction;
public Promise() {
this.lock = new Object();
}
void postComplete() {
fulfilmentAction.run();
}
/**
* Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
* completion.
*
* @param value
* value of the evaluated task
*/
public void setValue(T value) {
this.value = value;
this.state = COMPLETED;
synchronized (lock) {
lock.notifyAll();
}
}
/**
* Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
* completion.
*
* @param exception
* exception of the failed task
*/
public void setException(Exception exception) {
this.exception = exception;
this.state = FAILED;
synchronized (lock) {
lock.notifyAll();
}
}
@Override
public boolean isCompleted() {
return state > RUNNING;
}
@Override
public T getValue() throws ExecutionException {
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
throw new IllegalStateException("Execution not completed yet");
}
}
@Override
public void await() throws InterruptedException {
synchronized (lock) {
if (!isCompleted()) {
lock.wait();
}
}
}
@Override
public ListenableAsyncResult<Void> then(Consumer<? super T> action) {
Promise<Void> dest = new Promise<>();
fulfilmentAction = new ConsumeAction(this, dest, action);
return dest;
}
@Override
public <V> ListenableAsyncResult<V> then(Function<? super T, V> func) {
Promise<V> dest = new Promise<>();
fulfilmentAction = new FunctionAction<V>(this, dest, func);
return dest;
}
@Override
public ListenableAsyncResult<T> error(Consumer<? extends Throwable> action) {
return null;
}
private class ConsumeAction implements Runnable {
private Promise<T> current;
private Promise<Void> dest;
private Consumer<? super T> action;
public ConsumeAction(Promise<T> current, Promise<Void> dest, Consumer<? super T> action) {
this.current = current;
this.dest = dest;
this.action = action;
}
@Override
public void run() {
try {
action.accept(current.getValue());
dest.setValue(null);
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
dest.postComplete();
}
}
private class FunctionAction<V> implements Runnable {
private Promise<T> current;
private Promise<V> dest;
private Function<? super T, V> func;
public FunctionAction(Promise<T> current, Promise<V> dest, Function<? super T, V> func) {
this.current = current;
this.dest = dest;
this.func = func;
}
@Override
public void run() {
try {
V result = func.apply(current.getValue());
dest.setValue(result);
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
dest.postComplete();
}
}
}
}

View File

@ -0,0 +1,15 @@
package com.iluwatar.promise;
import org.junit.Test;
/**
*
* Application test.
*/
public class AppTest {
@Test
public void testApp() throws InterruptedException {
App.main(null);
}
}

View File

@ -0,0 +1,128 @@
package com.iluwatar.promise;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import com.iluwatar.async.method.invocation.ThreadAsyncExecutor;
/**
* Tests Promise class.
*/
public class PromiseTest {
private ThreadAsyncExecutor executor;
private Promise<Integer> promise;
@Rule public ExpectedException exception = ExpectedException.none();
@Before
public void setUp() {
executor = new ThreadAsyncExecutor();
promise = new Promise<>();
}
@Test
public void promiseIsFulfilledWithTheResultantValueOfExecutingTheTask()
throws InterruptedException, ExecutionException {
promise.fulfillInAsync(new NumberCrunchingTask(), executor);
// await fulfillment
promise.await();
assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, promise.getValue());
}
@Test
public void dependentPromiseIsFulfilledAfterTheConsumerConsumesTheResultOfThisPromise()
throws InterruptedException, ExecutionException {
Promise<Void> dependentPromise = promise
.fulfillInAsync(new NumberCrunchingTask(), executor)
.then(value -> {
assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, value);
});
// await fulfillment
dependentPromise.await();
}
@Test
public void dependentPromiseIsFulfilledWithAnExceptionIfConsumerThrowsAnException()
throws InterruptedException, ExecutionException {
Promise<Void> dependentPromise = promise
.fulfillInAsync(new NumberCrunchingTask(), executor)
.then(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
throw new RuntimeException("Barf!");
}
});
// await fulfillment
dependentPromise.await();
exception.expect(ExecutionException.class);
dependentPromise.getValue();
}
@Test
public void dependentPromiseIsFulfilledAfterTheFunctionTransformsTheResultOfThisPromise()
throws InterruptedException, ExecutionException {
Promise<String> dependentPromise = promise
.fulfillInAsync(new NumberCrunchingTask(), executor)
.then(value -> {
assertEquals(NumberCrunchingTask.CRUNCHED_NUMBER, value);
return String.valueOf(value);
});
// await fulfillment
dependentPromise.await();
assertEquals(String.valueOf(NumberCrunchingTask.CRUNCHED_NUMBER), dependentPromise.getValue());
}
@Test
public void dependentPromiseIsFulfilledWithAnExceptionIfTheFunctionThrowsException()
throws InterruptedException, ExecutionException {
Promise<String> dependentPromise = promise
.fulfillInAsync(new NumberCrunchingTask(), executor)
.then(new Function<Integer, String>() {
@Override
public String apply(Integer t) {
throw new RuntimeException("Barf!");
}
});
// await fulfillment
dependentPromise.await();
exception.expect(ExecutionException.class);
dependentPromise.getValue();
}
private static class NumberCrunchingTask implements Callable<Integer> {
private static final Integer CRUNCHED_NUMBER = Integer.MAX_VALUE;
@Override
public Integer call() throws Exception {
// Do number crunching
Thread.sleep(1000);
return CRUNCHED_NUMBER;
}
}
}