From 99adb5b0cfa9f54962eb82c536a3b714e4a377e7 Mon Sep 17 00:00:00 2001 From: Narendra Pathai Date: Sun, 23 Aug 2015 11:48:57 +0530 Subject: [PATCH] Work on #74 Initial logging server example --- reactor/pom.xml | 18 ++++ .../main/java/com/iluwatar/reactor/App.java | 42 ++++++++ .../java/com/iluwatar/reactor/AppClient.java | 62 ++++++++++++ .../java/com/iluwatar/reactor/NioReactor.java | 96 +++++++++++++++++++ 4 files changed, 218 insertions(+) create mode 100644 reactor/pom.xml create mode 100644 reactor/src/main/java/com/iluwatar/reactor/App.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/AppClient.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/NioReactor.java diff --git a/reactor/pom.xml b/reactor/pom.xml new file mode 100644 index 000000000..0f3271a9c --- /dev/null +++ b/reactor/pom.xml @@ -0,0 +1,18 @@ + + + 4.0.0 + + com.iluwatar + java-design-patterns + 1.5.0 + + reactor + + + junit + junit + test + + + diff --git a/reactor/src/main/java/com/iluwatar/reactor/App.java b/reactor/src/main/java/com/iluwatar/reactor/App.java new file mode 100644 index 000000000..d5cd05fec --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/App.java @@ -0,0 +1,42 @@ +package com.iluwatar.reactor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import com.iluwatar.reactor.NioReactor.NioChannelEventHandler; + +public class App { + + public static void main(String[] args) { + try { + new NioReactor(6666, new LoggingServer()).start(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + static class LoggingServer implements NioChannelEventHandler { + + @Override + public void onReadable(SocketChannel channel) { + ByteBuffer requestBuffer = ByteBuffer.allocate(1024); + try { + int byteCount = channel.read(requestBuffer); + if (byteCount > 0) { + byte[] logRequestContents = new byte[byteCount]; + byte[] array = requestBuffer.array(); + System.arraycopy(array, 0, logRequestContents, 0, byteCount); + doLogging(new String(logRequestContents)); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void doLogging(String log) { + // do logging at server side + System.out.println(log); + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java new file mode 100644 index 000000000..a5d871462 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java @@ -0,0 +1,62 @@ +package com.iluwatar.reactor; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.InetAddress; +import java.net.Socket; + +public class AppClient { + + public static void main(String[] args) { + new LoggingClient("Client 1", 6666).start(); + } + + + /* + * A logging client that sends logging requests to logging server + */ + static class LoggingClient { + + private int serverPort; + private String clientName; + + public LoggingClient(String clientName, int serverPort) { + this.clientName = clientName; + this.serverPort = serverPort; + } + + public void start() { + Socket socket = null; + try { + socket = new Socket(InetAddress.getLocalHost(), serverPort); + OutputStream outputStream = socket.getOutputStream(); + PrintWriter writer = new PrintWriter(outputStream); + writeLogs(writer); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } + + private void writeLogs(PrintWriter writer) { + for (int i = 0; i < 10; i++) { + writer.println(clientName + " - Log request: " + i); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + writer.flush(); + } + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java new file mode 100644 index 000000000..b2952397c --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java @@ -0,0 +1,96 @@ +package com.iluwatar.reactor; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Set; + +public class NioReactor { + + private int port; + private Selector selector; + private ServerSocketChannel serverSocketChannel; + private NioChannelEventHandler nioEventhandler; + + public NioReactor(int port, NioChannelEventHandler handler) { + this.port = port; + this.nioEventhandler = handler; + } + + + public void start() throws IOException { + startReactor(); + requestLoop(); + } + + private void startReactor() throws IOException { + selector = Selector.open(); + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().bind(new InetSocketAddress(port)); + serverSocketChannel.configureBlocking(false); + SelectionKey acceptorKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + acceptorKey.attach(new Acceptor()); + System.out.println("Reactor started listening on port: " + port); + } + + private void requestLoop() throws IOException { + while (true) { + selector.select(); + Set keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + dispatchEvent(key); + } + keys.clear(); + } + } + + private void dispatchEvent(SelectionKey key) throws IOException { + Object handler = key.attachment(); + if (handler != null) { + ((EventHandler)handler).handle(); + } + } + + interface EventHandler { + void handle() throws IOException; + } + + private class Acceptor implements EventHandler { + + public void handle() throws IOException { + // non-blocking accept as acceptor will only be called when accept event is available + SocketChannel clientChannel = serverSocketChannel.accept(); + if (clientChannel != null) { + new ChannelHandler(clientChannel).handle(); + } + System.out.println("Connection established with a client"); + } + } + + public static interface NioChannelEventHandler { + void onReadable(SocketChannel channel); + } + + private class ChannelHandler implements EventHandler { + + private SocketChannel clientChannel; + private SelectionKey selectionKey; + + public ChannelHandler(SocketChannel clientChannel) throws IOException { + this.clientChannel = clientChannel; + clientChannel.configureBlocking(false); + selectionKey = clientChannel.register(selector, 0); + selectionKey.attach(this); + selectionKey.interestOps(SelectionKey.OP_READ); + selector.wakeup(); + } + + public void handle() throws IOException { + // only read events are supported. + nioEventhandler.onReadable(clientChannel); + } + } +}