From dc4904c8d0dde8c278e362854a5db7e52684036b Mon Sep 17 00:00:00 2001 From: Narendra Pathai Date: Mon, 27 Jul 2015 15:29:56 +0530 Subject: [PATCH] Changed the implementation for better understanding --- .../iluwatar/halfsynchalfasync/AsyncTask.java | 29 ++++++++++ .../AsynchronousService.java | 32 +++-------- .../halfsynchalfasync/QueuingLayer.java | 8 +++ .../halfsynchalfasync/SynchronousLayer.java | 46 +++++++++++---- .../AsynchronousServiceTest.java | 57 ++++++++++++------- 5 files changed, 117 insertions(+), 55 deletions(-) create mode 100644 half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsyncTask.java create mode 100644 half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/QueuingLayer.java diff --git a/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsyncTask.java b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsyncTask.java new file mode 100644 index 000000000..1b9026c8b --- /dev/null +++ b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsyncTask.java @@ -0,0 +1,29 @@ +package com.iluwatar.halfsynchalfasync; + +import java.util.concurrent.Callable; + +/** + * Represents some computation that is performed asynchronously. The computation is typically + * done is background threads and the result is posted back in form of callback. + * + * @param type of result + */ +public interface AsyncTask extends Callable { + /** + * Is called in context of caller thread before call to {@link #call()}. + * Validations can be performed here so that the performance penalty of context + * switching is not incurred. + */ + void preExecute(); + + /** + * Is a callback which is called after the result is successfully computed by + * {@link #call()}. + */ + void onResult(O result); + + void onError(Throwable throwable); + + @Override + O call() throws Exception; +} diff --git a/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsynchronousService.java b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsynchronousService.java index 81186b433..1be5941c5 100644 --- a/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsynchronousService.java +++ b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/AsynchronousService.java @@ -1,7 +1,6 @@ package com.iluwatar.halfsynchalfasync; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @@ -12,40 +11,27 @@ import java.util.concurrent.ThreadPoolExecutor; * picks up the task and executes it in background and the result is posted back to the caller via * {@link Future}. */ -public abstract class AsynchronousService { +public class AsynchronousService { /* - * This is the synchronous layer to which request to do work is submitted. + * This is the synchronous layer to which request to do work is delegated. */ - private SynchronousLayer syncLayer = new SynchronousLayer(); + private SynchronousLayer syncLayer; + + public AsynchronousService(QueuingLayer queuingLayer) { + this.syncLayer = new SynchronousLayer(queuingLayer); + } /** - * Computes arithmetic sum for n * - * @return future representing arithmetic sum of n */ - public Future execute(final I input) { + public void execute(final AsyncTask task) { /* * This is the key part of this pattern where the caller thread does not block until * the result of work is computed but is delegated to the synchronous layer which * computes the task in background. This is useful if caller thread is an UI thread, * which MUST remain responsive to user inputs. */ - return syncLayer.submit(new Callable() { - - @Override - public O call() throws Exception { - return doInBackground(input); - } - - }); + syncLayer.execute(task); } - - /** - * This method is called in context of background thread where the implementation should compute - * and return the result for input. - * - * @return computed result - */ - protected abstract O doInBackground(I input); } diff --git a/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/QueuingLayer.java b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/QueuingLayer.java new file mode 100644 index 000000000..82515e393 --- /dev/null +++ b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/QueuingLayer.java @@ -0,0 +1,8 @@ +package com.iluwatar.halfsynchalfasync; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +public class QueuingLayer { + BlockingQueue incomingQueue = new LinkedBlockingQueue<>(); +} diff --git a/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/SynchronousLayer.java b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/SynchronousLayer.java index 9853ce8e2..42d329f8a 100644 --- a/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/SynchronousLayer.java +++ b/half-sync-half-async/src/main/java/com/iluwatar/halfsynchalfasync/SynchronousLayer.java @@ -1,32 +1,54 @@ package com.iluwatar.halfsynchalfasync; -import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * This represents the Queuing and Synchronous layer of Half-Sync/Half-Async pattern. - * The incoming requests are queued and then picked up by the background threads for execution. + * The {@link ThreadPoolExecutor} plays role of both Queuing layer as well as Synchronous layer + * of the pattern, where incoming tasks are queued if no worker is available. */ public class SynchronousLayer { /* - * This is the queuing layer where incoming work is queued + * This is the synchronous layer where background threads execute the work. */ - private LinkedBlockingQueue tasks = new LinkedBlockingQueue(); - /* - * This is the synchronous layer where background threads execute the work - */ - private ExecutorService service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, tasks); + private ExecutorService service; + /** + * Creates synchronous layer which uses queuing layer to wait for incoming tasks to execute. + */ + public SynchronousLayer(QueuingLayer queuingLayer) { + service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, queuingLayer.incomingQueue); + } /** * Submit new work for backgrounds threads to compute * @return the result after executing the work */ - public Future submit(Callable work) { - return service.submit(work); + public void execute(final AsyncTask work) { + work.preExecute(); + + service.submit(new FutureTask(work) { + @Override + protected void done() { + super.done(); + try { + /* called in context of background thread. There is other variant possible + * where result is posted back and sits in the queue of caller thread which + * then picks it up for processing. An example of such a system is Android OS, + * where the UI elements can only be updated using UI thread. So result must be + * posted back in UI thread. + */ + work.onResult(get()); + } catch (InterruptedException e) { + // should not occur + } catch (ExecutionException e) { + work.onError(e.getCause()); + } + } + }); } } diff --git a/half-sync-half-async/src/test/java/com/iluwatar/halfsynchalfasync/AsynchronousServiceTest.java b/half-sync-half-async/src/test/java/com/iluwatar/halfsynchalfasync/AsynchronousServiceTest.java index a938628df..4a9976da1 100644 --- a/half-sync-half-async/src/test/java/com/iluwatar/halfsynchalfasync/AsynchronousServiceTest.java +++ b/half-sync-half-async/src/test/java/com/iluwatar/halfsynchalfasync/AsynchronousServiceTest.java @@ -1,10 +1,11 @@ package com.iluwatar.halfsynchalfasync; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import org.junit.Test; -import static org.junit.Assert.*; public class AsynchronousServiceTest { @@ -14,32 +15,48 @@ public class AsynchronousServiceTest { * Addition service is asynchronous layer which does not block on single request, * and is always available for listening new requests. */ - ArithmeticSumService service = new ArithmeticSumService(); - Future output1 = service.execute(100L); - Future output2 = service.execute(50L); - Future output3 = service.execute(200L); - Future output4 = service.execute(5L); + QueuingLayer queuingLayer = new QueuingLayer(); + new SynchronousLayer(queuingLayer); + AsynchronousService service = new AsynchronousService(queuingLayer); - assertEquals(ap(100), output1.get().longValue()); - assertEquals(ap(50), output2.get().longValue()); - assertEquals(ap(200), output3.get().longValue()); - assertEquals(ap(5), output4.get().longValue()); + service.execute(new ArithmeticSumTask(100)); + service.execute(new ArithmeticSumTask(50)); + service.execute(new ArithmeticSumTask(200)); + service.execute(new ArithmeticSumTask(5)); } - /* - * This is an asynchronous service which computes arithmetic sum - */ - class ArithmeticSumService extends AsynchronousService { + class ArithmeticSumTask implements AsyncTask { + private long n; + + public ArithmeticSumTask(long n) { + this.n = n; + } + + @Override + public Long call() throws Exception { + return ap(n); + } @Override - protected Long doInBackground(Long n) { - return (n) * (n + 1) / 2; + public void preExecute() { + if (n < 0) { + throw new IllegalArgumentException("n is less than 0"); + } + } + + @Override + public void onResult(Long result) { + assertEquals(ap(n), result.longValue()); + } + + @Override + public void onError(Throwable throwable) { + fail("Should not occur"); } } - - private long ap(int i) { + + private long ap(long i) { long out = (i) * (i + 1) / 2; - System.out.println(out); return out; } }