From 940a62bc01833ec11bba664af5914687e5a9e59e Mon Sep 17 00:00:00 2001 From: Narendra Pathai Date: Wed, 2 Sep 2015 15:08:34 +0530 Subject: [PATCH] Work on #74, added unit test cases --- .../main/java/com/iluwatar/reactor/App.java | 27 +++++++++++----- .../java/com/iluwatar/reactor/AppClient.java | 31 +++++++++++++++---- .../java/com/iluwatar/reactor/Dispatcher.java | 1 + .../java/com/iluwatar/reactor/NioReactor.java | 24 ++++++++++++-- .../reactor/SameThreadDispatcher.java | 5 +++ .../reactor/ThreadPoolDispatcher.java | 17 ++++++++-- .../java/com/iluwatar/reactor/AppTest.java | 27 ++++++++++++++++ 7 files changed, 114 insertions(+), 18 deletions(-) create mode 100644 reactor/src/test/java/com/iluwatar/reactor/AppTest.java diff --git a/reactor/src/main/java/com/iluwatar/reactor/App.java b/reactor/src/main/java/com/iluwatar/reactor/App.java index 36aa5290d..7ce27a78b 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/App.java +++ b/reactor/src/main/java/com/iluwatar/reactor/App.java @@ -4,19 +4,32 @@ import java.io.IOException; public class App { + private NioReactor reactor; + public static void main(String[] args) { try { - NioReactor reactor = new NioReactor(new ThreadPoolDispatcher(2)); - LoggingHandler loggingHandler = new LoggingHandler(); - reactor - .registerChannel(tcpChannel(6666, loggingHandler)) - .registerChannel(tcpChannel(6667, loggingHandler)) - .registerChannel(udpChannel(6668, loggingHandler)) - .start(); + new App().start(); } catch (IOException e) { e.printStackTrace(); } } + + public void start() throws IOException { + reactor = new NioReactor(new ThreadPoolDispatcher(2)); + + LoggingHandler loggingHandler = new LoggingHandler(); + + reactor + .registerChannel(tcpChannel(6666, loggingHandler)) + .registerChannel(tcpChannel(6667, loggingHandler)) + .registerChannel(udpChannel(6668, loggingHandler)) + .start(); + } + + public void stop() { + reactor.stop(); + } + private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); diff --git a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java index 188b64ea8..2ffb6c0de 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java @@ -10,15 +10,34 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class AppClient { - + private ExecutorService service = Executors.newFixedThreadPool(3); + public static void main(String[] args) { - new Thread(new LoggingClient("Client 1", 6666)).start(); - new Thread(new LoggingClient("Client 2", 6667)).start(); - new Thread(new UDPLoggingClient(6668)).start(); + new AppClient().start(); } + public void start() { + service.execute(new LoggingClient("Client 1", 6666)); + service.execute(new LoggingClient("Client 2", 6667)); + service.execute(new UDPLoggingClient(6668)); + } + + public void stop() { + service.shutdown(); + if (!service.isTerminated()) { + service.shutdownNow(); + try { + service.awaitTermination(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } /* * A logging client that sends logging requests to logging server @@ -55,7 +74,7 @@ public class AppClient { } private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException { - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 4; i++) { writer.println(clientName + " - Log request: " + i); try { Thread.sleep(100); @@ -86,7 +105,7 @@ public class AppClient { DatagramSocket socket = null; try { socket = new DatagramSocket(); - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 4; i++) { String message = "UDP Client" + " - Log request: " + i; try { DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port)); diff --git a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java index 15fe7774c..7c05a6c1d 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java @@ -4,4 +4,5 @@ import java.nio.channels.SelectionKey; public interface Dispatcher { void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); + void stop(); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java index f10ea4b82..6ee0cb989 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java +++ b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java @@ -9,6 +9,9 @@ import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /* * Abstractions @@ -20,6 +23,7 @@ public class NioReactor { private Selector selector; private Dispatcher dispatcher; private Queue pendingChanges = new ConcurrentLinkedQueue<>(); + private ExecutorService reactorService = Executors.newSingleThreadExecutor(); public NioReactor(Dispatcher dispatcher) throws IOException { this.dispatcher = dispatcher; @@ -34,7 +38,7 @@ public class NioReactor { } public void start() throws IOException { - new Thread( new Runnable() { + reactorService.execute(new Runnable() { @Override public void run() { try { @@ -44,11 +48,27 @@ public class NioReactor { e.printStackTrace(); } } - }, "Reactor Main").start(); + }); + } + + public void stop() { + reactorService.shutdownNow(); + selector.wakeup(); + try { + reactorService.awaitTermination(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + dispatcher.stop(); } private void eventLoop() throws IOException { while (true) { + + if (Thread.interrupted()) { + break; + } + // honor any pending requests first processPendingChanges(); diff --git a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java index 024441b7c..c27050a15 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java @@ -10,4 +10,9 @@ public class SameThreadDispatcher implements Dispatcher { channel.getHandler().handleChannelRead(channel, readObject, key); } } + + @Override + public void stop() { + // no-op + } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java index e9e4ac34c..600cb4da4 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java @@ -3,18 +3,19 @@ package com.iluwatar.reactor; import java.nio.channels.SelectionKey; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class ThreadPoolDispatcher extends SameThreadDispatcher { - private ExecutorService exectorService; + private ExecutorService executorService; public ThreadPoolDispatcher(int poolSize) { - this.exectorService = Executors.newFixedThreadPool(poolSize); + this.executorService = Executors.newFixedThreadPool(poolSize); } @Override public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { - exectorService.execute(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { @@ -22,5 +23,15 @@ public class ThreadPoolDispatcher extends SameThreadDispatcher { } }); } + + @Override + public void stop() { + executorService.shutdownNow(); + try { + executorService.awaitTermination(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/reactor/src/test/java/com/iluwatar/reactor/AppTest.java b/reactor/src/test/java/com/iluwatar/reactor/AppTest.java new file mode 100644 index 000000000..17ce0b912 --- /dev/null +++ b/reactor/src/test/java/com/iluwatar/reactor/AppTest.java @@ -0,0 +1,27 @@ +package com.iluwatar.reactor; + +import java.io.IOException; + +import org.junit.Test; + +public class AppTest { + + @Test + public void testApp() throws IOException { + App app = new App(); + app.start(); + + AppClient client = new AppClient(); + client.start(); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + client.stop(); + + app.stop(); + } +}