From 7ac262b880f9ab5b52dd000a5728c965109b9d8c Mon Sep 17 00:00:00 2001 From: Narendra Pathai Date: Fri, 4 Sep 2015 17:43:01 +0530 Subject: [PATCH] Work on #74, repackaged and added javadocs --- .../iluwatar/reactor/AbstractNioChannel.java | 74 ------ .../main/java/com/iluwatar/reactor/App.java | 45 ---- .../com/iluwatar/reactor/ChannelHandler.java | 8 - .../java/com/iluwatar/reactor/Dispatcher.java | 8 - .../com/iluwatar/reactor/LoggingHandler.java | 39 --- .../iluwatar/reactor/NioDatagramChannel.java | 80 ------ .../java/com/iluwatar/reactor/NioReactor.java | 170 ------------ .../reactor/SameThreadDispatcher.java | 18 -- .../reactor/ThreadPoolDispatcher.java | 37 --- .../java/com/iluwatar/reactor/app/App.java | 106 ++++++++ .../iluwatar/reactor/{ => app}/AppClient.java | 2 +- .../iluwatar/reactor/app/LoggingHandler.java | 62 +++++ .../reactor/framework/AbstractNioChannel.java | 150 +++++++++++ .../reactor/framework/ChannelHandler.java | 25 ++ .../reactor/framework/Dispatcher.java | 38 +++ .../reactor/framework/NioDatagramChannel.java | 156 +++++++++++ .../reactor/framework/NioReactor.java | 242 ++++++++++++++++++ .../NioServerSocketChannel.java | 38 ++- .../framework/SameThreadDispatcher.java | 43 ++++ .../framework/ThreadPoolDispatcher.java | 55 ++++ .../iluwatar/reactor/{ => app}/AppTest.java | 2 +- reactor/todo.txt | 9 + 22 files changed, 925 insertions(+), 482 deletions(-) delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/App.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/NioReactor.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java delete mode 100644 reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/app/App.java rename reactor/src/main/java/com/iluwatar/reactor/{ => app}/AppClient.java (99%) create mode 100644 reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java rename reactor/src/main/java/com/iluwatar/reactor/{ => framework}/NioServerSocketChannel.java (52%) create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java create mode 100644 reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java rename reactor/src/test/java/com/iluwatar/reactor/{ => app}/AppTest.java (91%) diff --git a/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java deleted file mode 100644 index f55cea073..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.iluwatar.reactor; - -import java.io.IOException; -import java.nio.channels.SelectableChannel; -import java.nio.channels.SelectionKey; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; - -public abstract class AbstractNioChannel { - - private SelectableChannel channel; - private ChannelHandler handler; - private Map> channelToPendingWrites = new ConcurrentHashMap<>(); - private NioReactor reactor; - - public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { - this.handler = handler; - this.channel = channel; - } - - public void setReactor(NioReactor reactor) { - this.reactor = reactor; - } - - public SelectableChannel getChannel() { - return channel; - } - - public abstract int getInterestedOps(); - - public abstract Object read(SelectionKey key) throws IOException; - - public void setHandler(ChannelHandler handler) { - this.handler = handler; - } - - public ChannelHandler getHandler() { - return handler; - } - - // Called from the context of reactor thread - public void write(SelectionKey key) throws IOException { - Queue pendingWrites = channelToPendingWrites.get(key.channel()); - while (true) { - Object pendingWrite = pendingWrites.poll(); - if (pendingWrite == null) { - System.out.println("No more pending writes"); - reactor.changeOps(key, SelectionKey.OP_READ); - break; - } - - doWrite(pendingWrite, key); - } - } - - protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException; - - public void write(Object data, SelectionKey key) { - Queue pendingWrites = this.channelToPendingWrites.get(key.channel()); - if (pendingWrites == null) { - synchronized (this.channelToPendingWrites) { - pendingWrites = this.channelToPendingWrites.get(key.channel()); - if (pendingWrites == null) { - pendingWrites = new ConcurrentLinkedQueue<>(); - this.channelToPendingWrites.put(key.channel(), pendingWrites); - } - } - } - pendingWrites.add(data); - reactor.changeOps(key, SelectionKey.OP_WRITE); - } -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/App.java b/reactor/src/main/java/com/iluwatar/reactor/App.java deleted file mode 100644 index 7ce27a78b..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/App.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.iluwatar.reactor; - -import java.io.IOException; - -public class App { - - private NioReactor reactor; - - public static void main(String[] args) { - try { - new App().start(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void start() throws IOException { - reactor = new NioReactor(new ThreadPoolDispatcher(2)); - - LoggingHandler loggingHandler = new LoggingHandler(); - - reactor - .registerChannel(tcpChannel(6666, loggingHandler)) - .registerChannel(tcpChannel(6667, loggingHandler)) - .registerChannel(udpChannel(6668, loggingHandler)) - .start(); - } - - public void stop() { - reactor.stop(); - } - - - private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { - NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); - channel.bind(); - return channel; - } - - private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { - NioDatagramChannel channel = new NioDatagramChannel(port, handler); - channel.bind(); - return channel; - } -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java deleted file mode 100644 index e84c506f9..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.iluwatar.reactor; - -import java.nio.channels.SelectionKey; - -public interface ChannelHandler { - - void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java deleted file mode 100644 index 7c05a6c1d..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.iluwatar.reactor; - -import java.nio.channels.SelectionKey; - -public interface Dispatcher { - void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); - void stop(); -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java deleted file mode 100644 index fc7efaeed..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.iluwatar.reactor; - -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; - -import com.iluwatar.reactor.NioDatagramChannel.DatagramPacket; - -public class LoggingHandler implements ChannelHandler { - - @Override - public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { - if (readObject instanceof ByteBuffer) { - byte[] data = ((ByteBuffer)readObject).array(); - doLogging(data); - sendReply(channel, data, key); - } else if (readObject instanceof DatagramPacket) { - DatagramPacket datagram = (DatagramPacket)readObject; - byte[] data = datagram.getData().array(); - doLogging(data); - sendReply(channel, datagram, key); - } - } - - private void sendReply(AbstractNioChannel channel, DatagramPacket datagram, SelectionKey key) { - DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap("Data logged successfully".getBytes())); - replyPacket.setReceiver(datagram.getSender()); - channel.write(replyPacket, key); - } - - private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { - ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes()); - channel.write(buffer, key); - } - - private void doLogging(byte[] data) { - // assuming UTF-8 :( - System.out.println(new String(data)); - } -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java deleted file mode 100644 index 4d1690792..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.iluwatar.reactor; - -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.channels.DatagramChannel; -import java.nio.channels.SelectionKey; - -public class NioDatagramChannel extends AbstractNioChannel { - - private int port; - - public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { - super(handler, DatagramChannel.open()); - this.port = port; - } - - @Override - public int getInterestedOps() { - return SelectionKey.OP_READ; - } - - @Override - public Object read(SelectionKey key) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1024); - SocketAddress sender = getChannel().receive(buffer); - DatagramPacket packet = new DatagramPacket(buffer); - packet.setSender(sender); - return packet; - } - - @Override - public DatagramChannel getChannel() { - return (DatagramChannel) super.getChannel(); - } - - public void bind() throws IOException { - getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); - getChannel().configureBlocking(false); - System.out.println("Bound UDP socket at port: " + port); - } - - @Override - protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { - DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; - getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); - } - - static class DatagramPacket { - private SocketAddress sender; - private ByteBuffer data; - private SocketAddress receiver; - - public DatagramPacket(ByteBuffer data) { - this.data = data; - } - - public SocketAddress getSender() { - return sender; - } - - public void setSender(SocketAddress sender) { - this.sender = sender; - } - - public SocketAddress getReceiver() { - return receiver; - } - - public void setReceiver(SocketAddress receiver) { - this.receiver = receiver; - } - - public ByteBuffer getData() { - return data; - } - } -} \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java deleted file mode 100644 index 6ee0cb989..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java +++ /dev/null @@ -1,170 +0,0 @@ -package com.iluwatar.reactor; - -import java.io.IOException; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.util.Iterator; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -/* - * Abstractions - * --------------- - * 2 - Synchronous Event De-multiplexer - */ -public class NioReactor { - - private Selector selector; - private Dispatcher dispatcher; - private Queue pendingChanges = new ConcurrentLinkedQueue<>(); - private ExecutorService reactorService = Executors.newSingleThreadExecutor(); - - public NioReactor(Dispatcher dispatcher) throws IOException { - this.dispatcher = dispatcher; - this.selector = Selector.open(); - } - - public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { - SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); - key.attach(channel); - channel.setReactor(this); - return this; - } - - public void start() throws IOException { - reactorService.execute(new Runnable() { - @Override - public void run() { - try { - System.out.println("Reactor started, waiting for events..."); - eventLoop(); - } catch (IOException e) { - e.printStackTrace(); - } - } - }); - } - - public void stop() { - reactorService.shutdownNow(); - selector.wakeup(); - try { - reactorService.awaitTermination(4, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - dispatcher.stop(); - } - - private void eventLoop() throws IOException { - while (true) { - - if (Thread.interrupted()) { - break; - } - - // honor any pending requests first - processPendingChanges(); - - selector.select(); - - Set keys = selector.selectedKeys(); - - Iterator iterator = keys.iterator(); - - while (iterator.hasNext()) { - SelectionKey key = iterator.next(); - if (!key.isValid()) { - iterator.remove(); - continue; - } - processKey(key); - } - keys.clear(); - } - } - - private void processPendingChanges() { - Iterator iterator = pendingChanges.iterator(); - while (iterator.hasNext()) { - Command command = iterator.next(); - command.execute(); - iterator.remove(); - } - } - - private void processKey(SelectionKey key) throws IOException { - if (key.isAcceptable()) { - acceptConnection(key); - } else if (key.isReadable()) { - read(key); - } else if (key.isWritable()) { - write(key); - } - } - - private void write(SelectionKey key) throws IOException { - AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); - channel.write(key); - } - - private void read(SelectionKey key) { - Object readObject; - try { - readObject = ((AbstractNioChannel)key.attachment()).read(key); - dispatchReadEvent(key, readObject); - } catch (IOException e) { - try { - key.channel().close(); - } catch (IOException e1) { - e1.printStackTrace(); - } - } - } - - private void dispatchReadEvent(SelectionKey key, Object readObject) { - dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key); - } - - private void acceptConnection(SelectionKey key) throws IOException { - ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); - SocketChannel socketChannel = serverSocketChannel.accept(); - socketChannel.configureBlocking(false); - SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); - readKey.attach(key.attachment()); - } - - interface Command { - void execute(); - } - - public void changeOps(SelectionKey key, int interestedOps) { - pendingChanges.add(new ChangeKeyOpsCommand(key, interestedOps)); - selector.wakeup(); - } - - class ChangeKeyOpsCommand implements Command { - private SelectionKey key; - private int interestedOps; - - public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) { - this.key = key; - this.interestedOps = interestedOps; - } - - public void execute() { - key.interestOps(interestedOps); - } - - @Override - public String toString() { - return "Change of ops to: " + interestedOps; - } - } -} \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java deleted file mode 100644 index c27050a15..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.iluwatar.reactor; - -import java.nio.channels.SelectionKey; - -public class SameThreadDispatcher implements Dispatcher { - - @Override - public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { - if (channel.getHandler() != null) { - channel.getHandler().handleChannelRead(channel, readObject, key); - } - } - - @Override - public void stop() { - // no-op - } -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java deleted file mode 100644 index 600cb4da4..000000000 --- a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.iluwatar.reactor; - -import java.nio.channels.SelectionKey; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -public class ThreadPoolDispatcher extends SameThreadDispatcher { - - private ExecutorService executorService; - - public ThreadPoolDispatcher(int poolSize) { - this.executorService = Executors.newFixedThreadPool(poolSize); - } - - @Override - public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { - executorService.execute(new Runnable() { - - @Override - public void run() { - ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key); - } - }); - } - - @Override - public void stop() { - executorService.shutdownNow(); - try { - executorService.awaitTermination(1000, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - -} diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/App.java b/reactor/src/main/java/com/iluwatar/reactor/app/App.java new file mode 100644 index 000000000..d7b280465 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/app/App.java @@ -0,0 +1,106 @@ +package com.iluwatar.reactor.app; + +import java.io.IOException; + +import com.iluwatar.reactor.framework.AbstractNioChannel; +import com.iluwatar.reactor.framework.ChannelHandler; +import com.iluwatar.reactor.framework.NioDatagramChannel; +import com.iluwatar.reactor.framework.NioReactor; +import com.iluwatar.reactor.framework.NioServerSocketChannel; +import com.iluwatar.reactor.framework.ThreadPoolDispatcher; + +/** + * This application demonstrates Reactor pattern. It represents a Distributed Logging Service + * where it can listen on multiple TCP or UDP sockets for incoming log requests. + * + *

+ * INTENT + *
+ * The Reactor design pattern handles service requests that are delivered concurrently to an + * application by one or more clients. The application can register specific handlers for processing + * which are called by reactor on specific events. + * + *

+ * PROBLEM + *
+ * Server applications in a distributed system must handle multiple clients that send them service + * requests. Following forces need to be resolved: + *

    + *
  • Availability
  • + *
  • Efficiency
  • + *
  • Programming Simplicity
  • + *
  • Adaptability
  • + *
+ * + *

+ * The application utilizes single thread to listen for requests on all ports. It does not create + * a separate thread for each client, which provides better scalability under load (number of clients + * increase). + * + *

+ * The example uses Java NIO framework to implement the Reactor. + * + * @author npathai + * + */ +public class App { + + private NioReactor reactor; + + /** + * App entry. + */ + public static void main(String[] args) { + try { + new App().start(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Starts the NIO reactor. + * @throws IOException if any channel fails to bind. + */ + public void start() throws IOException { + /* + * The application can customize its event dispatching mechanism. + */ + reactor = new NioReactor(new ThreadPoolDispatcher(2)); + + /* + * This represents application specific business logic that dispatcher will call + * on appropriate events. These events are read and write event in our example. + */ + LoggingHandler loggingHandler = new LoggingHandler(); + + /* + * Our application binds to multiple I/O channels and uses same logging handler to handle + * incoming log requests. + */ + reactor + .registerChannel(tcpChannel(6666, loggingHandler)) + .registerChannel(tcpChannel(6667, loggingHandler)) + .registerChannel(udpChannel(6668, loggingHandler)) + .start(); + } + + /** + * Stops the NIO reactor. This is a blocking call. + */ + public void stop() { + reactor.stop(); + } + + private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { + NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); + channel.bind(); + return channel; + } + + private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { + NioDatagramChannel channel = new NioDatagramChannel(port, handler); + channel.bind(); + return channel; + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java similarity index 99% rename from reactor/src/main/java/com/iluwatar/reactor/AppClient.java rename to reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java index 2ffb6c0de..e5a7dd145 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java @@ -1,4 +1,4 @@ -package com.iluwatar.reactor; +package com.iluwatar.reactor.app; import java.io.IOException; import java.io.InputStream; diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java new file mode 100644 index 000000000..6fa95de2d --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java @@ -0,0 +1,62 @@ +package com.iluwatar.reactor.app; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +import com.iluwatar.reactor.framework.AbstractNioChannel; +import com.iluwatar.reactor.framework.ChannelHandler; +import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket; + +/** + * Logging server application logic. It logs the incoming requests on standard console and returns + * a canned acknowledgement back to the remote peer. + * + * @author npathai + */ +public class LoggingHandler implements ChannelHandler { + + private static final byte[] ACK = "Data logged successfully".getBytes(); + + /** + * Decodes the received data and logs it on standard console. + */ + @Override + public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { + /* + * As this channel is attached to both TCP and UDP channels we need to check whether + * the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel). + */ + if (readObject instanceof ByteBuffer) { + byte[] data = ((ByteBuffer)readObject).array(); + doLogging(data); + sendReply(channel, data, key); + } else if (readObject instanceof DatagramPacket) { + DatagramPacket datagram = (DatagramPacket)readObject; + byte[] data = datagram.getData().array(); + doLogging(data); + sendReply(channel, datagram, key); + } else { + throw new IllegalStateException("Unknown data received"); + } + } + + private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) { + /* + * Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming message. + */ + DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK)); + replyPacket.setReceiver(incomingPacket.getSender()); + + channel.write(replyPacket, key); + } + + private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { + ByteBuffer buffer = ByteBuffer.wrap(ACK); + channel.write(buffer, key); + } + + private void doLogging(byte[] data) { + // assuming UTF-8 :( + System.out.println(new String(data)); + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java new file mode 100644 index 000000000..a4b18179a --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java @@ -0,0 +1,150 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This represents the Handle of Reactor pattern. These are resources managed by OS + * which can be submitted to {@link NioReactor}. + * + *

+ * This class serves has the responsibility of reading the data when a read event occurs and + * writing the data back when the channel is writable. It leaves the reading and writing of + * data on the concrete implementation. It provides a block writing mechanism wherein when + * any {@link ChannelHandler} wants to write data back, it queues the data in pending write queue + * and clears it in block manner. This provides better throughput. + * + * @author npathai + * + */ +public abstract class AbstractNioChannel { + + private SelectableChannel channel; + private ChannelHandler handler; + private Map> channelToPendingWrites = new ConcurrentHashMap<>(); + private NioReactor reactor; + + /** + * Creates a new channel. + * @param handler which will handle events occurring on this channel. + * @param channel a NIO channel to be wrapped. + */ + public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { + this.handler = handler; + this.channel = channel; + } + + /** + * Injects the reactor in this channel. + */ + void setReactor(NioReactor reactor) { + this.reactor = reactor; + } + + /** + * @return the wrapped NIO channel. + */ + public SelectableChannel getChannel() { + return channel; + } + + /** + * The operation in which the channel is interested, this operation is be provided to {@link Selector}. + * + * @return interested operation. + * @see SelectionKey + */ + public abstract int getInterestedOps(); + + /** + * Requests the channel to bind. + * + * @throws IOException if any I/O error occurs. + */ + public abstract void bind() throws IOException; + + /** + * Reads the data using the key and returns the read data. + * @param key the key which is readable. + * @return data read. + * @throws IOException if any I/O error occurs. + */ + public abstract Object read(SelectionKey key) throws IOException; + + /** + * @return the handler associated with this channel. + */ + public ChannelHandler getHandler() { + return handler; + } + + /* + * Called from the context of reactor thread when the key becomes writable. + * The channel writes the whole pending block of data at once. + */ + void flush(SelectionKey key) throws IOException { + Queue pendingWrites = channelToPendingWrites.get(key.channel()); + while (true) { + Object pendingWrite = pendingWrites.poll(); + if (pendingWrite == null) { + // We don't have anything more to write so channel is interested in reading more data + reactor.changeOps(key, SelectionKey.OP_READ); + break; + } + + // ask the concrete channel to make sense of data and write it to java channel + doWrite(pendingWrite, key); + } + } + + /** + * Writes the data to the channel. + * + * @param pendingWrite data which was queued for writing in batch mode. + * @param key the key which is writable. + * @throws IOException if any I/O error occurs. + */ + protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException; + + /** + * Queues the data for writing. The data is not guaranteed to be written on underlying channel + * when this method returns. It will be written when the channel is flushed. + * + *

+ * This method is used by the {@link ChannelHandler} to send reply back to the client. + *
+ * Example: + *

+	 * 
+	 * {@literal @}Override
+	 * public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
+	 *   byte[] data = ((ByteBuffer)readObject).array();
+	 *   ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
+	 *   channel.write(buffer, key);
+	 * }
+	 * 
+	 * 
+	 * @param data the data to be written on underlying channel.
+	 * @param key the key which is writable.
+	 */
+	public void write(Object data, SelectionKey key) {
+		Queue pendingWrites = this.channelToPendingWrites.get(key.channel());
+		if (pendingWrites == null) {
+			synchronized (this.channelToPendingWrites) {
+				pendingWrites = this.channelToPendingWrites.get(key.channel());
+				if (pendingWrites == null) {
+					pendingWrites = new ConcurrentLinkedQueue<>();
+					this.channelToPendingWrites.put(key.channel(), pendingWrites);
+				}
+			}
+		}
+		pendingWrites.add(data);
+		reactor.changeOps(key, SelectionKey.OP_WRITE);
+	}
+}
diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
new file mode 100644
index 000000000..e1df57020
--- /dev/null
+++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
@@ -0,0 +1,25 @@
+package com.iluwatar.reactor.framework;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * Represents the EventHandler of Reactor pattern. It handles the incoming events dispatched
+ * to it by the {@link Dispatcher}. This is where the application logic resides.
+ * 
+ * 

+ * A {@link ChannelHandler} is associated with one or many {@link AbstractNioChannel}s, and whenever + * an event occurs on any of the associated channels, the handler is notified of the event. + * + * @author npathai + */ +public interface ChannelHandler { + + /** + * Called when the {@code channel} has received some data from remote peer. + * + * @param channel the channel from which the data is received. + * @param readObject the data read. + * @param key the key from which the data is received. + */ + void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java new file mode 100644 index 000000000..120a11085 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java @@ -0,0 +1,38 @@ +package com.iluwatar.reactor.framework; + +import java.nio.channels.SelectionKey; + +/** + * Represents the event dispatching strategy. When {@link NioReactor} senses any event on the + * registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write + * or connect, and then calls the {@link Dispatcher} to dispatch the event. This decouples the I/O + * processing from application specific processing. + *
+ * Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred. + * + *

+ * The application can customize the way in which event is dispatched such as using the reactor thread to + * dispatch event to channels or use a worker pool to do the non I/O processing. + * + * @see SameThreadDispatcher + * @see ThreadPoolDispatcher + * + * @author npathai + */ +public interface Dispatcher { + /** + * This hook method is called when read event occurs on particular channel. The data read + * is provided in readObject. The implementation should dispatch this read event + * to the associated {@link ChannelHandler} of channel. + * + * @param channel on which read event occurred + * @param readObject object read by channel + * @param key on which event occurred + */ + void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); + + /** + * Stops the dispatching events and cleans up any acquired resources such as threads. + */ + void stop(); +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java new file mode 100644 index 000000000..2666f05b8 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java @@ -0,0 +1,156 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +/** + * A wrapper over {@link DatagramChannel} which can read and write data on a DatagramChannel. + * + * @author npathai + */ +public class NioDatagramChannel extends AbstractNioChannel { + + private int port; + + /** + * Creates a {@link DatagramChannel} which will bind at provided port and use handler to handle + * incoming events on this channel. + *

+ * Note the constructor does not bind the socket, {@link #bind()} method should be called for binding + * the socket. + * + * @param port the port to be bound to listen for incoming datagram requests. + * @param handler the handler to be used for handling incoming requests on this channel. + * @throws IOException if any I/O error occurs. + */ + public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { + super(handler, DatagramChannel.open()); + this.port = port; + } + + @Override + public int getInterestedOps() { + /* there is no need to accept connections in UDP, so the channel shows interest in + * reading data. + */ + return SelectionKey.OP_READ; + } + + /** + * Reads and returns a {@link DatagramPacket} from the underlying channel. + * @return the datagram packet read having the sender address. + */ + @Override + public DatagramPacket read(SelectionKey key) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1024); + SocketAddress sender = getChannel().receive(buffer); + + /* + * It is required to create a DatagramPacket because we need to preserve which + * socket address acts as destination for sending reply packets. + */ + DatagramPacket packet = new DatagramPacket(buffer); + packet.setSender(sender); + + return packet; + } + + /** + * @return the underlying datagram channel. + */ + @Override + public DatagramChannel getChannel() { + return (DatagramChannel) super.getChannel(); + } + + /** + * Binds UDP socket on the provided port. + * + * @throws IOException if any I/O error occurs. + */ + @Override + public void bind() throws IOException { + getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + getChannel().configureBlocking(false); + System.out.println("Bound UDP socket at port: " + port); + } + + /** + * Writes the pending {@link DatagramPacket} to the underlying channel sending data to + * the intended receiver of the packet. + */ + @Override + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; + getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); + } + + /** + * Write the outgoing {@link DatagramPacket} to the channel. The intended receiver of the + * datagram packet must be set in the data using {@link DatagramPacket#setReceiver(SocketAddress)}. + */ + @Override + public void write(Object data, SelectionKey key) { + super.write(data, key); + } + + /** + * Container of data used for {@link NioDatagramChannel} to communicate with remote peer. + */ + public static class DatagramPacket { + private SocketAddress sender; + private ByteBuffer data; + private SocketAddress receiver; + + /** + * Creates a container with underlying data. + * + * @param data the underlying message to be written on channel. + */ + public DatagramPacket(ByteBuffer data) { + this.data = data; + } + + /** + * @return the sender address. + */ + public SocketAddress getSender() { + return sender; + } + + /** + * Sets the sender address of this packet. + * @param sender the sender address. + */ + public void setSender(SocketAddress sender) { + this.sender = sender; + } + + /** + * @return the receiver address. + */ + public SocketAddress getReceiver() { + return receiver; + } + + /** + * Sets the intended receiver address. This must be set when writing to the channel. + * @param receiver the receiver address. + */ + public void setReceiver(SocketAddress receiver) { + this.receiver = receiver; + } + + /** + * @return the underlying message that will be written on channel. + */ + public ByteBuffer getData() { + return data; + } + } +} \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java new file mode 100644 index 000000000..b92f4a9ba --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java @@ -0,0 +1,242 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +/** + * This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern. + * Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks + * for events from all these handles. Whenever an event occurs on any of the registered handles, + * it synchronously de-multiplexes the event which can be any of read, write or accept, and + * dispatches the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}. + * + *

+ * Implementation: + * A NIO reactor runs in its own thread when it is started using {@link #start()} method. + * {@link NioReactor} uses {@link Selector} as a mechanism for achieving Synchronous Event De-multiplexing. + * + *

+ * NOTE: This is one of the way to implement NIO reactor and it does not take care of all possible edge cases + * which may be required in a real application. This implementation is meant to demonstrate the fundamental + * concepts that lie behind Reactor pattern. + * + * @author npathai + * + */ +public class NioReactor { + + private Selector selector; + private Dispatcher dispatcher; + /** + * All the work of altering the SelectionKey operations and Selector operations are performed in + * the context of main event loop of reactor. So when any channel needs to change its readability + * or writability, a new command is added in the command queue and then the event loop picks up + * the command and executes it in next iteration. + */ + private Queue pendingCommands = new ConcurrentLinkedQueue<>(); + private ExecutorService reactorMain = Executors.newSingleThreadExecutor(); + + /** + * Creates a reactor which will use provided {@code dispatcher} to dispatch events. + * The application can provide various implementations of dispatcher which suits its + * needs. + * + * @param dispatcher a non-null dispatcher used to dispatch events on registered channels. + * @throws IOException if any I/O error occurs. + */ + public NioReactor(Dispatcher dispatcher) throws IOException { + this.dispatcher = dispatcher; + this.selector = Selector.open(); + } + + /** + * Starts the reactor event loop in a new thread. + * + * @throws IOException if any I/O error occurs. + */ + public void start() throws IOException { + reactorMain.execute(new Runnable() { + @Override + public void run() { + try { + System.out.println("Reactor started, waiting for events..."); + eventLoop(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } + + /** + * Stops the reactor and related resources such as dispatcher. + */ + public void stop() { + reactorMain.shutdownNow(); + selector.wakeup(); + try { + reactorMain.awaitTermination(4, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + dispatcher.stop(); + } + + /** + * Registers a new channel (handle) with this reactor after which the reactor will wait for events + * on this channel. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()} + * to know about the interested operation of this channel. + * + * @param channel a new handle on which reactor will wait for events. The channel must be bound + * prior to being registered. + * @return this + * @throws IOException if any I/O error occurs. + */ + public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { + SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); + key.attach(channel); + channel.setReactor(this); + return this; + } + + private void eventLoop() throws IOException { + while (true) { + + // Honor interrupt request + if (Thread.interrupted()) { + break; + } + + // honor any pending commands first + processPendingCommands(); + + /* + * Synchronous event de-multiplexing happens here, this is blocking call which + * returns when it is possible to initiate non-blocking operation on any of the + * registered channels. + */ + selector.select(); + + /* + * Represents the events that have occurred on registered handles. + */ + Set keys = selector.selectedKeys(); + + Iterator iterator = keys.iterator(); + + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + if (!key.isValid()) { + iterator.remove(); + continue; + } + processKey(key); + } + keys.clear(); + } + } + + private void processPendingCommands() { + Iterator iterator = pendingCommands.iterator(); + while (iterator.hasNext()) { + Runnable command = iterator.next(); + command.run(); + iterator.remove(); + } + } + + /* + * Initiation dispatcher logic, it checks the type of event and notifier application + * specific event handler to handle the event. + */ + private void processKey(SelectionKey key) throws IOException { + if (key.isAcceptable()) { + onChannelAcceptable(key); + } else if (key.isReadable()) { + onChannelReadable(key); + } else if (key.isWritable()) { + onChannelWritable(key); + } + } + + private void onChannelWritable(SelectionKey key) throws IOException { + AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); + channel.flush(key); + } + + private void onChannelReadable(SelectionKey key) { + try { + // reads the incoming data in context of reactor main loop. Can this be improved? + Object readObject = ((AbstractNioChannel)key.attachment()).read(key); + + dispatchReadEvent(key, readObject); + } catch (IOException e) { + try { + key.channel().close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } + + /* + * Uses the application provided dispatcher to dispatch events to respective handlers. + */ + private void dispatchReadEvent(SelectionKey key, Object readObject) { + dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key); + } + + private void onChannelAcceptable(SelectionKey key) throws IOException { + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + readKey.attach(key.attachment()); + } + + /** + * Queues the change of operations request of a channel, which will change the interested + * operations of the channel sometime in future. + *

+ * This is a non-blocking method and does not guarantee that the operations are changed when + * this method returns. + * + * @param key the key for which operations are to be changed. + * @param interestedOps the new interest operations. + */ + public void changeOps(SelectionKey key, int interestedOps) { + pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps)); + selector.wakeup(); + } + + /** + * A command that changes the interested operations of the key provided. + */ + class ChangeKeyOpsCommand implements Runnable { + private SelectionKey key; + private int interestedOps; + + public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) { + this.key = key; + this.interestedOps = interestedOps; + } + + public void run() { + key.interestOps(interestedOps); + } + + @Override + public String toString() { + return "Change of ops to: " + interestedOps; + } + } +} \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java similarity index 52% rename from reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java rename to reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java index ebd8f0ef3..92fa9234f 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java @@ -1,4 +1,4 @@ -package com.iluwatar.reactor; +package com.iluwatar.reactor.framework; import java.io.IOException; import java.net.InetAddress; @@ -8,25 +8,51 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +/** + * A wrapper over {@link NioServerSocketChannel} which can read and write data on a {@link SocketChannel}. + * + * @author npathai + */ public class NioServerSocketChannel extends AbstractNioChannel { private int port; + /** + * Creates a {@link ServerSocketChannel} which will bind at provided port and use + * handler to handle incoming events on this channel. + *

+ * Note the constructor does not bind the socket, {@link #bind()} method should be called for binding + * the socket. + * + * @param port the port to be bound to listen for incoming requests. + * @param handler the handler to be used for handling incoming requests on this channel. + * @throws IOException if any I/O error occurs. + */ public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { super(handler, ServerSocketChannel.open()); this.port = port; } + @Override public int getInterestedOps() { + // being a server socket channel it is interested in accepting connection from remote clients. return SelectionKey.OP_ACCEPT; } + /** + * @return the underlying {@link ServerSocketChannel}. + */ @Override public ServerSocketChannel getChannel() { return (ServerSocketChannel) super.getChannel(); } + /** + * Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by + * the key. Due to the fact that there is a dedicated channel for each client connection + * we don't need to store the sender. + */ @Override public ByteBuffer read(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); @@ -38,12 +64,22 @@ public class NioServerSocketChannel extends AbstractNioChannel { return buffer; } + /** + * Binds TCP socket on the provided port. + * + * @throws IOException if any I/O error occurs. + */ + @Override public void bind() throws IOException { ((ServerSocketChannel)getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); ((ServerSocketChannel)getChannel()).configureBlocking(false); System.out.println("Bound TCP socket at port: " + port); } + /** + * Writes the pending {@link ByteBuffer} to the underlying channel sending data to + * the intended receiver of the packet. + */ @Override protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java new file mode 100644 index 000000000..2300d7c74 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java @@ -0,0 +1,43 @@ +package com.iluwatar.reactor.framework; + +import java.nio.channels.SelectionKey; + +/** + * Dispatches the events in the context of caller thread. This implementation is a good fit for + * small applications where there are limited clients. Using this implementation limits the scalability + * because the I/O thread performs the application specific processing. + * + *

+ * For real applications use {@link ThreadPoolDispatcher}. + * + * @see ThreadPoolDispatcher + * + * @author npathai + */ +public class SameThreadDispatcher implements Dispatcher { + + /** + * Dispatches the read event in the context of caller thread. + *
+ * Note this is a blocking call. It returns only after the associated handler has handled the + * read event. + */ + @Override + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { + if (channel.getHandler() != null) { + /* + * Calls the associated handler to notify the read event where application specific code + * resides. + */ + channel.getHandler().handleChannelRead(channel, readObject, key); + } + } + + /** + * No resources to free. + */ + @Override + public void stop() { + // no-op + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java new file mode 100644 index 000000000..b514d1824 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java @@ -0,0 +1,55 @@ +package com.iluwatar.reactor.framework; + +import java.nio.channels.SelectionKey; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * An implementation that uses a pool of worker threads to dispatch the events. This provides + * for better scalability as the application specific processing is not performed in the context + * of I/O thread. + * + * @author npathai + * + */ +public class ThreadPoolDispatcher extends SameThreadDispatcher { + + private ExecutorService executorService; + + /** + * Creates a pooled dispatcher with tunable pool size. + * + * @param poolSize number of pooled threads + */ + public ThreadPoolDispatcher(int poolSize) { + this.executorService = Executors.newFixedThreadPool(poolSize); + } + + /** + * Submits the work of dispatching the read event to worker pool, where it gets picked + * up by worker threads. + *
+ * Note that this is a non-blocking call and returns immediately. It is not guaranteed + * that the event has been handled by associated handler. + */ + @Override + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { + executorService.execute(() -> + ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key)); + } + + /** + * Stops the pool of workers. + */ + @Override + public void stop() { + executorService.shutdownNow(); + try { + executorService.awaitTermination(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + +} diff --git a/reactor/src/test/java/com/iluwatar/reactor/AppTest.java b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java similarity index 91% rename from reactor/src/test/java/com/iluwatar/reactor/AppTest.java rename to reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java index 17ce0b912..9447aac01 100644 --- a/reactor/src/test/java/com/iluwatar/reactor/AppTest.java +++ b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java @@ -1,4 +1,4 @@ -package com.iluwatar.reactor; +package com.iluwatar.reactor.app; import java.io.IOException; diff --git a/reactor/todo.txt b/reactor/todo.txt index af06a1892..a59af62b9 100644 --- a/reactor/todo.txt +++ b/reactor/todo.txt @@ -2,3 +2,12 @@ * Cleanup * Document - Javadoc * Better design?? Get review of @iluwatar + + +Design view: + +Handles ---> AbstractNioChannel +Selector ---> Synchronous Event Demultiplexer +NioReactor ---> Initiation Dispatcher + +