diff --git a/promise/pom.xml b/promise/pom.xml new file mode 100644 index 000000000..ca12515ee --- /dev/null +++ b/promise/pom.xml @@ -0,0 +1,47 @@ + + + + 4.0.0 + + com.iluwatar + java-design-patterns + 1.13.0-SNAPSHOT + + promise + + + junit + junit + test + + + org.mockito + mockito-core + test + + + diff --git a/promise/src/main/java/com/iluwatar/promise/App.java b/promise/src/main/java/com/iluwatar/promise/App.java new file mode 100644 index 000000000..dc22c307a --- /dev/null +++ b/promise/src/main/java/com/iluwatar/promise/App.java @@ -0,0 +1,24 @@ +package com.iluwatar.promise; + +public class App { + + /** + * Program entry point + * @param args arguments + */ + public static void main(String[] args) { + ThreadAsyncExecutor executor = new ThreadAsyncExecutor(); + executor.execute(() -> { + Thread.sleep(1000); + return 10; + }).then(value -> {System.out.println("Consumed the value: " + value);}) + .then(nullVal -> {System.out.println("Post consuming value");}); + + + executor.execute(() -> { + Thread.sleep(1000); + return "10"; + }).then(value -> {return 10 + Integer.parseInt(value);}) + .then(intValue -> {System.out.println("Consumed int value: " + intValue);}); + } +} diff --git a/promise/src/main/java/com/iluwatar/promise/ListenableAsyncResult.java b/promise/src/main/java/com/iluwatar/promise/ListenableAsyncResult.java new file mode 100644 index 000000000..a68154e17 --- /dev/null +++ b/promise/src/main/java/com/iluwatar/promise/ListenableAsyncResult.java @@ -0,0 +1,12 @@ +package com.iluwatar.promise; + +import java.util.function.Consumer; +import java.util.function.Function; + +import com.iluwatar.async.method.invocation.AsyncResult; + +public interface ListenableAsyncResult extends AsyncResult { + ListenableAsyncResult then(Consumer action); + ListenableAsyncResult then(Function func); + ListenableAsyncResult error(Consumer action); +} diff --git a/promise/src/main/java/com/iluwatar/promise/PromiseAsyncExecutor.java b/promise/src/main/java/com/iluwatar/promise/PromiseAsyncExecutor.java new file mode 100644 index 000000000..eb43b0546 --- /dev/null +++ b/promise/src/main/java/com/iluwatar/promise/PromiseAsyncExecutor.java @@ -0,0 +1,7 @@ +package com.iluwatar.promise; + +import java.util.concurrent.Callable; + +public interface PromiseAsyncExecutor { + ListenableAsyncResult execute(Callable task); +} diff --git a/promise/src/main/java/com/iluwatar/promise/ThreadAsyncExecutor.java b/promise/src/main/java/com/iluwatar/promise/ThreadAsyncExecutor.java new file mode 100644 index 000000000..aa057d676 --- /dev/null +++ b/promise/src/main/java/com/iluwatar/promise/ThreadAsyncExecutor.java @@ -0,0 +1,173 @@ +package com.iluwatar.promise; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; + +public class ThreadAsyncExecutor implements PromiseAsyncExecutor { + + /** Index for thread naming */ + private final AtomicInteger idx = new AtomicInteger(0); + + @Override + public ListenableAsyncResult execute(Callable task) { + Promise promise = new Promise<>(); + new Thread(() -> { + try { + promise.setValue(task.call()); + promise.postComplete(); + } catch (Exception ex) { + promise.setException(ex); + } + } , "executor-" + idx.incrementAndGet()).start(); + return promise; + } + + // TODO there is scope of extending the completable future from async method invocation project. Do that. + private class Promise implements ListenableAsyncResult { + + static final int RUNNING = 1; + static final int FAILED = 2; + static final int COMPLETED = 3; + + final Object lock; + volatile int state = RUNNING; + T value; + Exception exception; + Runnable fulfilmentAction; + + public Promise() { + this.lock = new Object(); + } + + void postComplete() { + fulfilmentAction.run(); + } + + /** + * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for + * completion. + * + * @param value + * value of the evaluated task + */ + public void setValue(T value) { + this.value = value; + this.state = COMPLETED; + synchronized (lock) { + lock.notifyAll(); + } + } + + /** + * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for + * completion. + * + * @param exception + * exception of the failed task + */ + public void setException(Exception exception) { + this.exception = exception; + this.state = FAILED; + synchronized (lock) { + lock.notifyAll(); + } + } + + @Override + public boolean isCompleted() { + return state > RUNNING; + } + + @Override + public T getValue() throws ExecutionException { + if (state == COMPLETED) { + return value; + } else if (state == FAILED) { + throw new ExecutionException(exception); + } else { + throw new IllegalStateException("Execution not completed yet"); + } + } + + @Override + public void await() throws InterruptedException { + synchronized (lock) { + if (!isCompleted()) { + lock.wait(); + } + } + } + + @Override + public ListenableAsyncResult then(Consumer action) { + Promise dest = new Promise<>(); + fulfilmentAction = new ConsumeAction(this, dest, action); + return dest; + } + + @Override + public ListenableAsyncResult then(Function func) { + Promise dest = new Promise<>(); + fulfilmentAction = new FunctionAction(this, dest, func); + return dest; + } + + @Override + public ListenableAsyncResult error(Consumer action) { + return null; + } + + private class ConsumeAction implements Runnable { + + private Promise current; + private Promise dest; + private Consumer action; + + public ConsumeAction(Promise current, Promise dest, Consumer action) { + this.current = current; + this.dest = dest; + this.action = action; + } + + @Override + public void run() { + try { + action.accept(current.getValue()); + dest.setValue(null); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + dest.postComplete(); + } + } + + private class FunctionAction implements Runnable { + + private Promise current; + private Promise dest; + private Function func; + + public FunctionAction(Promise current, Promise dest, Function func) { + this.current = current; + this.dest = dest; + this.func = func; + } + + @Override + public void run() { + try { + V result = func.apply(current.getValue()); + dest.setValue(result); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + dest.postComplete(); + } + } + } + } \ No newline at end of file