Changed the implementation for better understanding

This commit is contained in:
Narendra Pathai 2015-07-27 15:29:56 +05:30
parent 4c22055e47
commit dc4904c8d0
5 changed files with 117 additions and 55 deletions

View File

@ -0,0 +1,29 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.Callable;
/**
* Represents some computation that is performed asynchronously. The computation is typically
* done is background threads and the result is posted back in form of callback.
*
* @param <O> type of result
*/
public interface AsyncTask<O> extends Callable<O> {
/**
* Is called in context of caller thread before call to {@link #call()}.
* Validations can be performed here so that the performance penalty of context
* switching is not incurred.
*/
void preExecute();
/**
* Is a callback which is called after the result is successfully computed by
* {@link #call()}.
*/
void onResult(O result);
void onError(Throwable throwable);
@Override
O call() throws Exception;
}

View File

@ -1,7 +1,6 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
@ -12,40 +11,27 @@ import java.util.concurrent.ThreadPoolExecutor;
* picks up the task and executes it in background and the result is posted back to the caller via
* {@link Future}.
*/
public abstract class AsynchronousService<I, O> {
public class AsynchronousService {
/*
* This is the synchronous layer to which request to do work is submitted.
* This is the synchronous layer to which request to do work is delegated.
*/
private SynchronousLayer syncLayer = new SynchronousLayer();
private SynchronousLayer syncLayer;
public AsynchronousService(QueuingLayer queuingLayer) {
this.syncLayer = new SynchronousLayer(queuingLayer);
}
/**
* Computes arithmetic sum for n
*
* @return future representing arithmetic sum of n
*/
public Future<O> execute(final I input) {
public void execute(final AsyncTask<?> task) {
/*
* This is the key part of this pattern where the caller thread does not block until
* the result of work is computed but is delegated to the synchronous layer which
* computes the task in background. This is useful if caller thread is an UI thread,
* which MUST remain responsive to user inputs.
*/
return syncLayer.submit(new Callable<O>() {
@Override
public O call() throws Exception {
return doInBackground(input);
}
});
syncLayer.execute(task);
}
/**
* This method is called in context of background thread where the implementation should compute
* and return the result for input.
*
* @return computed result
*/
protected abstract O doInBackground(I input);
}

View File

@ -0,0 +1,8 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class QueuingLayer {
BlockingQueue<Runnable> incomingQueue = new LinkedBlockingQueue<>();
}

View File

@ -1,32 +1,54 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This represents the Queuing and Synchronous layer of Half-Sync/Half-Async pattern.
* The incoming requests are queued and then picked up by the background threads for execution.
* The {@link ThreadPoolExecutor} plays role of both Queuing layer as well as Synchronous layer
* of the pattern, where incoming tasks are queued if no worker is available.
*/
public class SynchronousLayer {
/*
* This is the queuing layer where incoming work is queued
* This is the synchronous layer where background threads execute the work.
*/
private LinkedBlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
/*
* This is the synchronous layer where background threads execute the work
*/
private ExecutorService service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, tasks);
private ExecutorService service;
/**
* Creates synchronous layer which uses queuing layer to wait for incoming tasks to execute.
*/
public SynchronousLayer(QueuingLayer queuingLayer) {
service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, queuingLayer.incomingQueue);
}
/**
* Submit new work for backgrounds threads to compute
* @return the result after executing the work
*/
public <T> Future<T> submit(Callable<T> work) {
return service.submit(work);
public <T> void execute(final AsyncTask<T> work) {
work.preExecute();
service.submit(new FutureTask<T>(work) {
@Override
protected void done() {
super.done();
try {
/* called in context of background thread. There is other variant possible
* where result is posted back and sits in the queue of caller thread which
* then picks it up for processing. An example of such a system is Android OS,
* where the UI elements can only be updated using UI thread. So result must be
* posted back in UI thread.
*/
work.onResult(get());
} catch (InterruptedException e) {
// should not occur
} catch (ExecutionException e) {
work.onError(e.getCause());
}
}
});
}
}

View File

@ -1,10 +1,11 @@
package com.iluwatar.halfsynchalfasync;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Test;
import static org.junit.Assert.*;
public class AsynchronousServiceTest {
@ -14,32 +15,48 @@ public class AsynchronousServiceTest {
* Addition service is asynchronous layer which does not block on single request,
* and is always available for listening new requests.
*/
ArithmeticSumService service = new ArithmeticSumService();
Future<Long> output1 = service.execute(100L);
Future<Long> output2 = service.execute(50L);
Future<Long> output3 = service.execute(200L);
Future<Long> output4 = service.execute(5L);
QueuingLayer queuingLayer = new QueuingLayer();
new SynchronousLayer(queuingLayer);
AsynchronousService service = new AsynchronousService(queuingLayer);
assertEquals(ap(100), output1.get().longValue());
assertEquals(ap(50), output2.get().longValue());
assertEquals(ap(200), output3.get().longValue());
assertEquals(ap(5), output4.get().longValue());
service.execute(new ArithmeticSumTask(100));
service.execute(new ArithmeticSumTask(50));
service.execute(new ArithmeticSumTask(200));
service.execute(new ArithmeticSumTask(5));
}
/*
* This is an asynchronous service which computes arithmetic sum
*/
class ArithmeticSumService extends AsynchronousService<Long, Long> {
class ArithmeticSumTask implements AsyncTask<Long> {
private long n;
public ArithmeticSumTask(long n) {
this.n = n;
}
@Override
public Long call() throws Exception {
return ap(n);
}
@Override
protected Long doInBackground(Long n) {
return (n) * (n + 1) / 2;
public void preExecute() {
if (n < 0) {
throw new IllegalArgumentException("n is less than 0");
}
}
@Override
public void onResult(Long result) {
assertEquals(ap(n), result.longValue());
}
@Override
public void onError(Throwable throwable) {
fail("Should not occur");
}
}
private long ap(int i) {
private long ap(long i) {
long out = (i) * (i + 1) / 2;
System.out.println(out);
return out;
}
}