Work on #74, added unit test cases

This commit is contained in:
Narendra Pathai 2015-09-02 15:08:34 +05:30
parent b94c1d37d2
commit 940a62bc01
7 changed files with 114 additions and 18 deletions

View File

@ -4,20 +4,33 @@ import java.io.IOException;
public class App { public class App {
private NioReactor reactor;
public static void main(String[] args) { public static void main(String[] args) {
try { try {
NioReactor reactor = new NioReactor(new ThreadPoolDispatcher(2)); new App().start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() throws IOException {
reactor = new NioReactor(new ThreadPoolDispatcher(2));
LoggingHandler loggingHandler = new LoggingHandler(); LoggingHandler loggingHandler = new LoggingHandler();
reactor reactor
.registerChannel(tcpChannel(6666, loggingHandler)) .registerChannel(tcpChannel(6666, loggingHandler))
.registerChannel(tcpChannel(6667, loggingHandler)) .registerChannel(tcpChannel(6667, loggingHandler))
.registerChannel(udpChannel(6668, loggingHandler)) .registerChannel(udpChannel(6668, loggingHandler))
.start(); .start();
} catch (IOException e) {
e.printStackTrace();
} }
public void stop() {
reactor.stop();
} }
private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
channel.bind(); channel.bind();

View File

@ -10,15 +10,34 @@ import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketException; import java.net.SocketException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class AppClient { public class AppClient {
private ExecutorService service = Executors.newFixedThreadPool(3);
public static void main(String[] args) { public static void main(String[] args) {
new Thread(new LoggingClient("Client 1", 6666)).start(); new AppClient().start();
new Thread(new LoggingClient("Client 2", 6667)).start();
new Thread(new UDPLoggingClient(6668)).start();
} }
public void start() {
service.execute(new LoggingClient("Client 1", 6666));
service.execute(new LoggingClient("Client 2", 6667));
service.execute(new UDPLoggingClient(6668));
}
public void stop() {
service.shutdown();
if (!service.isTerminated()) {
service.shutdownNow();
try {
service.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/* /*
* A logging client that sends logging requests to logging server * A logging client that sends logging requests to logging server
@ -55,7 +74,7 @@ public class AppClient {
} }
private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException { private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException {
for (int i = 0; i < 1; i++) { for (int i = 0; i < 4; i++) {
writer.println(clientName + " - Log request: " + i); writer.println(clientName + " - Log request: " + i);
try { try {
Thread.sleep(100); Thread.sleep(100);
@ -86,7 +105,7 @@ public class AppClient {
DatagramSocket socket = null; DatagramSocket socket = null;
try { try {
socket = new DatagramSocket(); socket = new DatagramSocket();
for (int i = 0; i < 1; i++) { for (int i = 0; i < 4; i++) {
String message = "UDP Client" + " - Log request: " + i; String message = "UDP Client" + " - Log request: " + i;
try { try {
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port)); DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port));

View File

@ -4,4 +4,5 @@ import java.nio.channels.SelectionKey;
public interface Dispatcher { public interface Dispatcher {
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
void stop();
} }

View File

@ -9,6 +9,9 @@ import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/* /*
* Abstractions * Abstractions
@ -20,6 +23,7 @@ public class NioReactor {
private Selector selector; private Selector selector;
private Dispatcher dispatcher; private Dispatcher dispatcher;
private Queue<Command> pendingChanges = new ConcurrentLinkedQueue<>(); private Queue<Command> pendingChanges = new ConcurrentLinkedQueue<>();
private ExecutorService reactorService = Executors.newSingleThreadExecutor();
public NioReactor(Dispatcher dispatcher) throws IOException { public NioReactor(Dispatcher dispatcher) throws IOException {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
@ -34,7 +38,7 @@ public class NioReactor {
} }
public void start() throws IOException { public void start() throws IOException {
new Thread( new Runnable() { reactorService.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
@ -44,11 +48,27 @@ public class NioReactor {
e.printStackTrace(); e.printStackTrace();
} }
} }
}, "Reactor Main").start(); });
}
public void stop() {
reactorService.shutdownNow();
selector.wakeup();
try {
reactorService.awaitTermination(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
dispatcher.stop();
} }
private void eventLoop() throws IOException { private void eventLoop() throws IOException {
while (true) { while (true) {
if (Thread.interrupted()) {
break;
}
// honor any pending requests first // honor any pending requests first
processPendingChanges(); processPendingChanges();

View File

@ -10,4 +10,9 @@ public class SameThreadDispatcher implements Dispatcher {
channel.getHandler().handleChannelRead(channel, readObject, key); channel.getHandler().handleChannelRead(channel, readObject, key);
} }
} }
@Override
public void stop() {
// no-op
}
} }

View File

@ -3,18 +3,19 @@ package com.iluwatar.reactor;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDispatcher extends SameThreadDispatcher { public class ThreadPoolDispatcher extends SameThreadDispatcher {
private ExecutorService exectorService; private ExecutorService executorService;
public ThreadPoolDispatcher(int poolSize) { public ThreadPoolDispatcher(int poolSize) {
this.exectorService = Executors.newFixedThreadPool(poolSize); this.executorService = Executors.newFixedThreadPool(poolSize);
} }
@Override @Override
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
exectorService.execute(new Runnable() { executorService.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -23,4 +24,14 @@ public class ThreadPoolDispatcher extends SameThreadDispatcher {
}); });
} }
@Override
public void stop() {
executorService.shutdownNow();
try {
executorService.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} }

View File

@ -0,0 +1,27 @@
package com.iluwatar.reactor;
import java.io.IOException;
import org.junit.Test;
public class AppTest {
@Test
public void testApp() throws IOException {
App app = new App();
app.start();
AppClient client = new AppClient();
client.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
client.stop();
app.stop();
}
}