diff --git a/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java new file mode 100644 index 000000000..9f6040ade --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java @@ -0,0 +1,75 @@ +package com.iluwatar.reactor; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +public abstract class AbstractNioChannel { + + private SelectableChannel channel; + private ChannelHandler handler; + private Map> channelToPendingWrites = new ConcurrentHashMap<>(); + private NioReactor reactor; + + public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { + this.handler = handler; + this.channel = channel; + } + + public void setReactor(NioReactor reactor) { + this.reactor = reactor; + } + + public SelectableChannel getChannel() { + return channel; + } + + public abstract int getInterestedOps(); + + public abstract ByteBuffer read(SelectionKey key) throws IOException; + + public void setHandler(ChannelHandler handler) { + this.handler = handler; + } + + public ChannelHandler getHandler() { + return handler; + } + + // Called from the context of reactor thread + public void write(SelectionKey key) throws IOException { + Queue pendingWrites = channelToPendingWrites.get(key.channel()); + while (true) { + ByteBuffer pendingWrite = pendingWrites.poll(); + if (pendingWrite == null) { + System.out.println("No more pending writes"); + reactor.changeOps(key, SelectionKey.OP_READ); + break; + } + + doWrite(pendingWrite, key); + } + } + + protected abstract void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException; + + public void write(ByteBuffer buffer, SelectionKey key) { + Queue pendingWrites = this.channelToPendingWrites.get(key.channel()); + if (pendingWrites == null) { + synchronized (this.channelToPendingWrites) { + pendingWrites = this.channelToPendingWrites.get(key.channel()); + if (pendingWrites == null) { + pendingWrites = new ConcurrentLinkedQueue<>(); + this.channelToPendingWrites.put(key.channel(), pendingWrites); + } + } + } + pendingWrites.add(buffer); + reactor.changeOps(key, SelectionKey.OP_WRITE); + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/App.java b/reactor/src/main/java/com/iluwatar/reactor/App.java index 4c7b06e9d..36aa5290d 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/App.java +++ b/reactor/src/main/java/com/iluwatar/reactor/App.java @@ -1,69 +1,32 @@ package com.iluwatar.reactor; import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectableChannel; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; - -import com.iluwatar.reactor.NioReactor.NioChannelEventHandler; public class App { public static void main(String[] args) { try { - NioReactor reactor = new NioReactor(); - + NioReactor reactor = new NioReactor(new ThreadPoolDispatcher(2)); + LoggingHandler loggingHandler = new LoggingHandler(); reactor - .registerChannel(tcpChannel(6666)) - .registerChannel(tcpChannel(6667)) + .registerChannel(tcpChannel(6666, loggingHandler)) + .registerChannel(tcpChannel(6667, loggingHandler)) + .registerChannel(udpChannel(6668, loggingHandler)) .start(); - - reactor.registerHandler(new LoggingServer()); } catch (IOException e) { e.printStackTrace(); } } - private static SelectableChannel udpChannel(int port) throws IOException { - DatagramChannel channel = DatagramChannel.open(); - channel.socket().bind(new InetSocketAddress(port)); - channel.configureBlocking(false); - System.out.println("Bound UDP socket at port: " + port); + private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { + NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); + channel.bind(); return channel; } - - private static SelectableChannel tcpChannel(int port) throws IOException { - ServerSocketChannel channel = ServerSocketChannel.open(); - channel.socket().bind(new InetSocketAddress(port)); - channel.configureBlocking(false); - System.out.println("Bound TCP socket at port: " + port); + + private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { + NioDatagramChannel channel = new NioDatagramChannel(port, handler); + channel.bind(); return channel; } - - static class LoggingServer implements NioChannelEventHandler { - - @Override - public void onReadable(SocketChannel channel) { - ByteBuffer requestBuffer = ByteBuffer.allocate(1024); - try { - int byteCount = channel.read(requestBuffer); - if (byteCount > 0) { - byte[] logRequestContents = new byte[byteCount]; - byte[] array = requestBuffer.array(); - System.arraycopy(array, 0, logRequestContents, 0, byteCount); - doLogging(new String(logRequestContents)); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private void doLogging(String log) { - // do logging at server side - System.out.println(log); - } - } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java index 1181745fb..3d7323a55 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java @@ -1,16 +1,22 @@ package com.iluwatar.reactor; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.io.PrintWriter; +import java.net.DatagramPacket; +import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; public class AppClient { public static void main(String[] args) { - new Thread(new LoggingClient("Client 1", 6666)).start(); - new Thread(new LoggingClient("Client 2", 6667)).start(); +// new Thread(new LoggingClient("Client 1", 6666)).start(); +// new Thread(new LoggingClient("Client 2", 6667)).start(); + new Thread(new UDPLoggingClient(6668)).start(); } @@ -33,7 +39,7 @@ public class AppClient { socket = new Socket(InetAddress.getLocalHost(), serverPort); OutputStream outputStream = socket.getOutputStream(); PrintWriter writer = new PrintWriter(outputStream); - writeLogs(writer); + writeLogs(writer, socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); @@ -48,8 +54,8 @@ public class AppClient { } } - private void writeLogs(PrintWriter writer) { - for (int i = 0; i < 10; i++) { + private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException { + for (int i = 0; i < 1; i++) { writer.println(clientName + " - Log request: " + i); try { Thread.sleep(100); @@ -57,6 +63,53 @@ public class AppClient { e.printStackTrace(); } writer.flush(); + byte[] data = new byte[1024]; + int read = inputStream.read(data, 0, data.length); + if (read == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(data, 0, read)); + } + } + } + } + + static class UDPLoggingClient implements Runnable { + private int port; + + public UDPLoggingClient(int port) { + this.port = port; + } + + @Override + public void run() { + DatagramSocket socket = null; + try { + socket = new DatagramSocket(); + for (int i = 0; i < 1; i++) { + String message = "UDP Client" + " - Log request: " + i; + try { + DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port)); + socket.send(packet); + + byte[] data = new byte[1024]; + DatagramPacket reply = new DatagramPacket(data, data.length); + socket.receive(reply); + if (reply.getLength() == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(reply.getData(), 0, reply.getLength())); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } catch (SocketException e1) { + e1.printStackTrace(); + } finally { + if (socket != null) { + socket.close(); + } } } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java new file mode 100644 index 000000000..055e8edd6 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java @@ -0,0 +1,9 @@ +package com.iluwatar.reactor; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +public interface ChannelHandler { + + void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key); +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java new file mode 100644 index 000000000..1bc14c55f --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java @@ -0,0 +1,8 @@ +package com.iluwatar.reactor; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +public interface Dispatcher { + void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key); +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java new file mode 100644 index 000000000..3744c3d5a --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java @@ -0,0 +1,24 @@ +package com.iluwatar.reactor; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +public class LoggingHandler implements ChannelHandler { + + @Override + public void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { + byte[] data = readBytes.array(); + doLogging(data); + sendEchoReply(channel, data, key); + } + + private void sendEchoReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { + ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes()); + channel.write(buffer, key); + } + + private void doLogging(byte[] data) { + // assuming UTF-8 :( + System.out.println(new String(data)); + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java new file mode 100644 index 000000000..2f655f192 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java @@ -0,0 +1,47 @@ +package com.iluwatar.reactor; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +public class NioDatagramChannel extends AbstractNioChannel { + + private int port; + + public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { + super(handler, DatagramChannel.open()); + this.port = port; + } + + @Override + public int getInterestedOps() { + return SelectionKey.OP_READ; + } + + @Override + public ByteBuffer read(SelectionKey key) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1024); + getChannel().receive(buffer); + return buffer; + } + + @Override + public DatagramChannel getChannel() { + return (DatagramChannel) super.getChannel(); + } + + public void bind() throws IOException { + getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + getChannel().configureBlocking(false); + System.out.println("Bound UDP socket at port: " + port); + } + + @Override + protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException { + pendingWrite.flip(); + getChannel().write(pendingWrite); + } +} \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java index 734ea086f..05aa609d1 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java +++ b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java @@ -1,46 +1,39 @@ package com.iluwatar.reactor; import java.io.IOException; -import java.nio.channels.SelectableChannel; +import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; -import java.util.List; +import java.util.Iterator; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ConcurrentLinkedQueue; /* * Abstractions * --------------- - * - * 1 - Dispatcher * 2 - Synchronous Event De-multiplexer - * 3 - Event - * 4 - Event Handler & concrete event handler (application business logic) - * 5 - Selector */ public class NioReactor { private Selector selector; - private Acceptor acceptor; - private List eventHandlers = new CopyOnWriteArrayList<>(); + private Dispatcher dispatcher; + private Queue pendingChanges = new ConcurrentLinkedQueue<>(); - public NioReactor() throws IOException { - this.acceptor = new Acceptor(); + public NioReactor(Dispatcher dispatcher) throws IOException { + this.dispatcher = dispatcher; this.selector = Selector.open(); } - public NioReactor registerChannel(SelectableChannel channel) throws IOException { - SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT); - key.attach(acceptor); + public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { + SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); + key.attach(channel); + channel.setReactor(this); return this; } - public void registerHandler(NioChannelEventHandler handler) { - eventHandlers.add(handler); - } - public void start() throws IOException { new Thread( new Runnable() { @Override @@ -52,66 +45,110 @@ public class NioReactor { e.printStackTrace(); } } - }).start(); + }, "Reactor Main").start(); } private void eventLoop() throws IOException { while (true) { - selector.select(1000); + // honor any pending requests first + processPendingChanges(); + + selector.select(); + Set keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - dispatchEvent(key); + + Iterator iterator = keys.iterator(); + + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + if (!key.isValid()) { + iterator.remove(); + continue; + } + processKey(key); } keys.clear(); } } - private void dispatchEvent(SelectionKey key) throws IOException { - Object handler = key.attachment(); - if (handler != null) { - ((EventHandler)handler).handle(key.channel()); + private void processPendingChanges() { + Iterator iterator = pendingChanges.iterator(); + while (iterator.hasNext()) { + Command command = iterator.next(); + System.out.println("Processing pending change: " + command); + command.execute(); + iterator.remove(); } } - interface EventHandler { - void handle(SelectableChannel channel) throws IOException; - } - - private class Acceptor implements EventHandler { - - public void handle(SelectableChannel channel) throws IOException { - // non-blocking accept as acceptor will only be called when accept event is available - SocketChannel clientChannel = ((ServerSocketChannel)channel).accept(); - if (clientChannel != null) { - new ChannelHandler(clientChannel).handle(clientChannel); - } - System.out.println("Connection established with a client"); + private void processKey(SelectionKey key) throws IOException { + if (key.isAcceptable()) { + acceptConnection(key); + } else if (key.isReadable()) { + System.out.println("Key is readable"); + read(key); + } else if (key.isWritable()) { + System.out.println("Key is writable"); + write(key); } } - - public static interface NioChannelEventHandler { - void onReadable(SocketChannel channel); + + private void write(SelectionKey key) throws IOException { + AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); + channel.write(key); } - - private class ChannelHandler implements EventHandler { - - private SocketChannel clientChannel; - private SelectionKey selectionKey; - public ChannelHandler(SocketChannel clientChannel) throws IOException { - this.clientChannel = clientChannel; - clientChannel.configureBlocking(false); - selectionKey = clientChannel.register(selector, 0); - selectionKey.attach(this); - selectionKey.interestOps(SelectionKey.OP_READ); - selector.wakeup(); - } - - public void handle(SelectableChannel channel) throws IOException { - // only read events are supported. - for (NioChannelEventHandler eventHandler : eventHandlers) { - eventHandler.onReadable(clientChannel); + private void read(SelectionKey key) { + ByteBuffer readBytes; + try { + readBytes = ((AbstractNioChannel)key.attachment()).read(key); + dispatchReadEvent(key, readBytes); + } catch (IOException e) { + try { + key.channel().close(); + } catch (IOException e1) { + e1.printStackTrace(); } } } -} + + private void dispatchReadEvent(SelectionKey key, ByteBuffer readBytes) { + dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readBytes, key); + } + + private void acceptConnection(SelectionKey key) throws IOException { + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + readKey.attach(key.attachment()); + } + + interface Command { + void execute(); + } + + public void changeOps(SelectionKey key, int interestedOps) { + pendingChanges.add(new ChangeKeyOpsCommand(key, interestedOps)); + selector.wakeup(); + } + + class ChangeKeyOpsCommand implements Command { + private SelectionKey key; + private int interestedOps; + + public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) { + this.key = key; + this.interestedOps = interestedOps; + } + + public void execute() { + key.interestOps(interestedOps); + } + + @Override + public String toString() { + return "Change of ops to: " + interestedOps; + } + } +} \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java new file mode 100644 index 000000000..66affdb8d --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java @@ -0,0 +1,52 @@ +package com.iluwatar.reactor; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class NioServerSocketChannel extends AbstractNioChannel { + + private int port; + + public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { + super(handler, ServerSocketChannel.open()); + this.port = port; + } + + @Override + public int getInterestedOps() { + return SelectionKey.OP_ACCEPT; + } + + @Override + public ServerSocketChannel getChannel() { + return (ServerSocketChannel) super.getChannel(); + } + + @Override + public ByteBuffer read(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int read = socketChannel.read(buffer); + if (read == -1) { + throw new IOException("Socket closed"); + } + return buffer; + } + + public void bind() throws IOException { + ((ServerSocketChannel)getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + ((ServerSocketChannel)getChannel()).configureBlocking(false); + System.out.println("Bound TCP socket at port: " + port); + } + + @Override + protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException { + System.out.println("Writing on channel"); + ((SocketChannel)key.channel()).write(pendingWrite); + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java new file mode 100644 index 000000000..9b8029de4 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java @@ -0,0 +1,14 @@ +package com.iluwatar.reactor; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +public class SameThreadDispatcher implements Dispatcher { + + @Override + public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { + if (channel.getHandler() != null) { + channel.getHandler().handleChannelRead(channel, readBytes, key); + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java new file mode 100644 index 000000000..2f44e4372 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java @@ -0,0 +1,27 @@ +package com.iluwatar.reactor; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ThreadPoolDispatcher extends SameThreadDispatcher { + + private ExecutorService exectorService; + + public ThreadPoolDispatcher(int poolSize) { + this.exectorService = Executors.newFixedThreadPool(poolSize); + } + + @Override + public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { + exectorService.execute(new Runnable() { + + @Override + public void run() { + ThreadPoolDispatcher.super.onChannelReadEvent(channel, readBytes, key); + } + }); + } + +} diff --git a/reactor/todo.txt b/reactor/todo.txt new file mode 100644 index 000000000..af06a1892 --- /dev/null +++ b/reactor/todo.txt @@ -0,0 +1,4 @@ +* Make UDP channel work (connect is required) +* Cleanup +* Document - Javadoc +* Better design?? Get review of @iluwatar