From d3f2ea22ac422ddac4d7d9733c45e3ebc46e6001 Mon Sep 17 00:00:00 2001 From: Narendra Pathai Date: Sun, 23 Aug 2015 18:51:24 +0530 Subject: [PATCH] Work on #74, enhanced reactor to allow multiple channels --- .../main/java/com/iluwatar/reactor/App.java | 31 +++++++- .../java/com/iluwatar/reactor/AppClient.java | 9 ++- .../java/com/iluwatar/reactor/NioReactor.java | 79 ++++++++++++------- 3 files changed, 84 insertions(+), 35 deletions(-) diff --git a/reactor/src/main/java/com/iluwatar/reactor/App.java b/reactor/src/main/java/com/iluwatar/reactor/App.java index d5cd05fec..4c7b06e9d 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/App.java +++ b/reactor/src/main/java/com/iluwatar/reactor/App.java @@ -1,7 +1,11 @@ package com.iluwatar.reactor; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectableChannel; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import com.iluwatar.reactor.NioReactor.NioChannelEventHandler; @@ -10,12 +14,35 @@ public class App { public static void main(String[] args) { try { - new NioReactor(6666, new LoggingServer()).start(); + NioReactor reactor = new NioReactor(); + + reactor + .registerChannel(tcpChannel(6666)) + .registerChannel(tcpChannel(6667)) + .start(); + + reactor.registerHandler(new LoggingServer()); } catch (IOException e) { e.printStackTrace(); } } - + + private static SelectableChannel udpChannel(int port) throws IOException { + DatagramChannel channel = DatagramChannel.open(); + channel.socket().bind(new InetSocketAddress(port)); + channel.configureBlocking(false); + System.out.println("Bound UDP socket at port: " + port); + return channel; + } + + private static SelectableChannel tcpChannel(int port) throws IOException { + ServerSocketChannel channel = ServerSocketChannel.open(); + channel.socket().bind(new InetSocketAddress(port)); + channel.configureBlocking(false); + System.out.println("Bound TCP socket at port: " + port); + return channel; + } + static class LoggingServer implements NioChannelEventHandler { @Override diff --git a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java index a5d871462..1181745fb 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java @@ -9,14 +9,15 @@ import java.net.Socket; public class AppClient { public static void main(String[] args) { - new LoggingClient("Client 1", 6666).start(); + new Thread(new LoggingClient("Client 1", 6666)).start(); + new Thread(new LoggingClient("Client 2", 6667)).start(); } /* * A logging client that sends logging requests to logging server */ - static class LoggingClient { + static class LoggingClient implements Runnable { private int serverPort; private String clientName; @@ -26,7 +27,7 @@ public class AppClient { this.serverPort = serverPort; } - public void start() { + public void run() { Socket socket = null; try { socket = new Socket(InetAddress.getLocalHost(), serverPort); @@ -51,7 +52,7 @@ public class AppClient { for (int i = 0; i < 10; i++) { writer.println(clientName + " - Log request: " + i); try { - Thread.sleep(1000); + Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java index b2952397c..734ea086f 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java +++ b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java @@ -1,44 +1,63 @@ package com.iluwatar.reactor; import java.io.IOException; -import java.net.InetSocketAddress; +import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +/* + * Abstractions + * --------------- + * + * 1 - Dispatcher + * 2 - Synchronous Event De-multiplexer + * 3 - Event + * 4 - Event Handler & concrete event handler (application business logic) + * 5 - Selector + */ public class NioReactor { - private int port; private Selector selector; - private ServerSocketChannel serverSocketChannel; - private NioChannelEventHandler nioEventhandler; - - public NioReactor(int port, NioChannelEventHandler handler) { - this.port = port; - this.nioEventhandler = handler; + private Acceptor acceptor; + private List eventHandlers = new CopyOnWriteArrayList<>(); + + public NioReactor() throws IOException { + this.acceptor = new Acceptor(); + this.selector = Selector.open(); } + public NioReactor registerChannel(SelectableChannel channel) throws IOException { + SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT); + key.attach(acceptor); + return this; + } + + public void registerHandler(NioChannelEventHandler handler) { + eventHandlers.add(handler); + } public void start() throws IOException { - startReactor(); - requestLoop(); + new Thread( new Runnable() { + @Override + public void run() { + try { + System.out.println("Reactor started, waiting for events..."); + eventLoop(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }).start(); } - private void startReactor() throws IOException { - selector = Selector.open(); - serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.socket().bind(new InetSocketAddress(port)); - serverSocketChannel.configureBlocking(false); - SelectionKey acceptorKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); - acceptorKey.attach(new Acceptor()); - System.out.println("Reactor started listening on port: " + port); - } - - private void requestLoop() throws IOException { + private void eventLoop() throws IOException { while (true) { - selector.select(); + selector.select(1000); Set keys = selector.selectedKeys(); for (SelectionKey key : keys) { dispatchEvent(key); @@ -50,21 +69,21 @@ public class NioReactor { private void dispatchEvent(SelectionKey key) throws IOException { Object handler = key.attachment(); if (handler != null) { - ((EventHandler)handler).handle(); + ((EventHandler)handler).handle(key.channel()); } } interface EventHandler { - void handle() throws IOException; + void handle(SelectableChannel channel) throws IOException; } private class Acceptor implements EventHandler { - public void handle() throws IOException { + public void handle(SelectableChannel channel) throws IOException { // non-blocking accept as acceptor will only be called when accept event is available - SocketChannel clientChannel = serverSocketChannel.accept(); + SocketChannel clientChannel = ((ServerSocketChannel)channel).accept(); if (clientChannel != null) { - new ChannelHandler(clientChannel).handle(); + new ChannelHandler(clientChannel).handle(clientChannel); } System.out.println("Connection established with a client"); } @@ -88,9 +107,11 @@ public class NioReactor { selector.wakeup(); } - public void handle() throws IOException { + public void handle(SelectableChannel channel) throws IOException { // only read events are supported. - nioEventhandler.onReadable(clientChannel); + for (NioChannelEventHandler eventHandler : eventHandlers) { + eventHandler.onReadable(clientChannel); + } } } }