Refactored code

This commit is contained in:
Narendra Pathai 2015-07-27 17:38:15 +05:30
parent dc4904c8d0
commit eeef8bf475
6 changed files with 90 additions and 149 deletions

View File

@ -3,27 +3,39 @@ package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.Callable;
/**
* Represents some computation that is performed asynchronously. The computation is typically
* done is background threads and the result is posted back in form of callback.
* Represents some computation that is performed asynchronously and its result.
* The computation is typically done is background threads and the result is posted
* back in form of callback. The callback does not implement {@code isComplete}, {@code cancel}
* as it is out of scope of this pattern.
*
* @param <O> type of result
*/
public interface AsyncTask<O> extends Callable<O> {
/**
* Is called in context of caller thread before call to {@link #call()}.
* Validations can be performed here so that the performance penalty of context
* switching is not incurred.
* Is called in context of caller thread before call to {@link #call()}. Large
* tasks should not be performed in this method. Validations can be performed here
* so that the performance penalty of context switching is not incurred in case of
* invalid requests.
*/
void preExecute();
/**
* Is a callback which is called after the result is successfully computed by
* {@link #call()}.
* A callback called after the result is successfully computed by {@link #call()}.
*/
void onResult(O result);
/**
* A callback called if computing the task resulted in some exception. This method
* is called when either of {@link #call()} or {@link #preExecute()} throw any exception.
*
* @param throwable error cause
*/
void onError(Throwable throwable);
/**
* This is where the computation of task should reside. This method is called in context
* of background thread.
*/
@Override
O call() throws Exception;
}

View File

@ -1,37 +1,77 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This is the asynchronous layer which does not block when a new request arrives. It just passes
* the request to the synchronous layer which consists of a queue i.e. a {@link BlockingQueue} and
* a pool of threads i.e. {@link ThreadPoolExecutor}. Out of this pool of threads one of the thread
* picks up the task and executes it in background and the result is posted back to the caller via
* {@link Future}.
* a pool of threads i.e. {@link ThreadPoolExecutor}. Out of this pool of worker threads one of the
* thread picks up the task and executes it synchronously in background and the result is posted back
* to the caller via callback.
*/
public class AsynchronousService {
/*
* This is the synchronous layer to which request to do work is delegated.
* This represents the queuing layer as well as synchronous layer of the pattern. The thread
* pool contains worker threads which execute the tasks in blocking/synchronous manner. Long
* running tasks should be performed in the background which does not affect the performance of
* main thread.
*/
private SynchronousLayer syncLayer;
public AsynchronousService(QueuingLayer queuingLayer) {
this.syncLayer = new SynchronousLayer(queuingLayer);
private ExecutorService service;
/**
* Creates an asynchronous service using {@code workQueue} as communication channel between
* asynchronous layer and synchronous layer. Different types of queues such as Priority queue,
* can be used to control the pattern of communication between the layers.
*/
public AsynchronousService(BlockingQueue<Runnable> workQueue) {
service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, workQueue);
}
/**
*
* A non-blocking method which performs the task provided in background and returns immediately.
* <p>
* On successful completion of task the result is posted back using callback method
* {@link AsyncTask#onResult(Object)}, if task execution is unable to complete normally
* due to some exception then the reason for error is posted back using callback method
* {@link AsyncTask#onError(Throwable)}.
* <p>
* NOTE: The results are posted back in the context of background thread in this implementation.
* There is other variant possible where the result is posted back in the queue of caller thread
* and then the result is processed in context of caller thread.
*/
public void execute(final AsyncTask<?> task) {
/*
* This is the key part of this pattern where the caller thread does not block until
* the result of work is computed but is delegated to the synchronous layer which
* computes the task in background. This is useful if caller thread is an UI thread,
* which MUST remain responsive to user inputs.
*/
syncLayer.execute(task);
public <T> void execute(final AsyncTask<T> task) {
try {
// some small tasks such as validation can be performed here.
task.preExecute();
} catch (Exception e) {
task.onError(e);
}
service.submit(new FutureTask<T>(task) {
@Override
protected void done() {
super.done();
try {
/* called in context of background thread. There is other variant possible
* where result is posted back and sits in the queue of caller thread which
* then picks it up for processing. An example of such a system is Android OS,
* where the UI elements can only be updated using UI thread. So result must be
* posted back in UI thread.
*/
task.onResult(get());
} catch (InterruptedException e) {
// should not occur
} catch (ExecutionException e) {
task.onError(e.getCause());
}
}
});
}
}

View File

@ -1,8 +0,0 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class QueuingLayer {
BlockingQueue<Runnable> incomingQueue = new LinkedBlockingQueue<>();
}

View File

@ -1,54 +0,0 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* This represents the Queuing and Synchronous layer of Half-Sync/Half-Async pattern.
* The {@link ThreadPoolExecutor} plays role of both Queuing layer as well as Synchronous layer
* of the pattern, where incoming tasks are queued if no worker is available.
*/
public class SynchronousLayer {
/*
* This is the synchronous layer where background threads execute the work.
*/
private ExecutorService service;
/**
* Creates synchronous layer which uses queuing layer to wait for incoming tasks to execute.
*/
public SynchronousLayer(QueuingLayer queuingLayer) {
service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, queuingLayer.incomingQueue);
}
/**
* Submit new work for backgrounds threads to compute
* @return the result after executing the work
*/
public <T> void execute(final AsyncTask<T> work) {
work.preExecute();
service.submit(new FutureTask<T>(work) {
@Override
protected void done() {
super.done();
try {
/* called in context of background thread. There is other variant possible
* where result is posted back and sits in the queue of caller thread which
* then picks it up for processing. An example of such a system is Android OS,
* where the UI elements can only be updated using UI thread. So result must be
* posted back in UI thread.
*/
work.onResult(get());
} catch (InterruptedException e) {
// should not occur
} catch (ExecutionException e) {
work.onError(e.getCause());
}
}
});
}
}

View File

@ -0,0 +1,13 @@
package com.iluwatar.halfsynchalfasync;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
public class AppTest {
@Test
public void test() throws InterruptedException, ExecutionException {
App.main(null);
}
}

View File

@ -1,62 +0,0 @@
package com.iluwatar.halfsynchalfasync;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.concurrent.ExecutionException;
import org.junit.Test;
public class AsynchronousServiceTest {
@Test
public void test() throws InterruptedException, ExecutionException {
/*
* Addition service is asynchronous layer which does not block on single request,
* and is always available for listening new requests.
*/
QueuingLayer queuingLayer = new QueuingLayer();
new SynchronousLayer(queuingLayer);
AsynchronousService service = new AsynchronousService(queuingLayer);
service.execute(new ArithmeticSumTask(100));
service.execute(new ArithmeticSumTask(50));
service.execute(new ArithmeticSumTask(200));
service.execute(new ArithmeticSumTask(5));
}
class ArithmeticSumTask implements AsyncTask<Long> {
private long n;
public ArithmeticSumTask(long n) {
this.n = n;
}
@Override
public Long call() throws Exception {
return ap(n);
}
@Override
public void preExecute() {
if (n < 0) {
throw new IllegalArgumentException("n is less than 0");
}
}
@Override
public void onResult(Long result) {
assertEquals(ap(n), result.longValue());
}
@Override
public void onError(Throwable throwable) {
fail("Should not occur");
}
}
private long ap(long i) {
long out = (i) * (i + 1) / 2;
return out;
}
}