diff --git a/pom.xml b/pom.xml index 70a164c06..2da35eb46 100644 --- a/pom.xml +++ b/pom.xml @@ -73,12 +73,13 @@ front-controller repository async-method-invocation - business-delegate - half-sync-half-async + business-delegate + half-sync-half-async step-builder layers message-channel fluentinterface + reactor diff --git a/reactor/etc/reactor.png b/reactor/etc/reactor.png new file mode 100644 index 000000000..0b00ec98b Binary files /dev/null and b/reactor/etc/reactor.png differ diff --git a/reactor/etc/reactor.ucls b/reactor/etc/reactor.ucls new file mode 100644 index 000000000..d072e4029 --- /dev/null +++ b/reactor/etc/reactor.ucls @@ -0,0 +1,207 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/reactor/index.md b/reactor/index.md new file mode 100644 index 000000000..7333c74dd --- /dev/null +++ b/reactor/index.md @@ -0,0 +1,30 @@ +--- +layout: pattern +title: Reactor +folder: reactor +permalink: /patterns/reactor/ +categories: Architectural +tags: + - Java + - Difficulty-Expert +--- + +**Intent:** The Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients. The application can register specific handlers for processing which are called by reactor on specific events. Dispatching of event handlers is performed by an initiation dispatcher, which manages the registered event handlers. Demultiplexing of service requests is performed by a synchronous event demultiplexer. + +![Reactor](./etc/reactor.png "Reactor") + +**Applicability:** Use Reactor pattern when + +* a server application needs to handle concurrent service requests from multiple clients. +* a server application needs to be available for receiving requests from new clients even when handling older client requests. +* a server must maximize throughput, minimize latency and use CPU efficiently without blocking. + +**Real world examples:** + +* [Spring Reactor](http://projectreactor.io/) + +**Credits** + +* [Douglas C. Schmidt - Reactor](https://www.dre.vanderbilt.edu/~schmidt/PDF/Reactor.pdf) +* [Doug Lea - Scalable IO in Java](http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf) +* [Netty](http://netty.io/) diff --git a/reactor/pom.xml b/reactor/pom.xml new file mode 100644 index 000000000..599376e32 --- /dev/null +++ b/reactor/pom.xml @@ -0,0 +1,18 @@ + + + 4.0.0 + + com.iluwatar + java-design-patterns + 1.6.0 + + reactor + + + junit + junit + test + + + diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/App.java b/reactor/src/main/java/com/iluwatar/reactor/app/App.java new file mode 100644 index 000000000..5c6d91ee8 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/app/App.java @@ -0,0 +1,131 @@ +package com.iluwatar.reactor.app; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import com.iluwatar.reactor.framework.AbstractNioChannel; +import com.iluwatar.reactor.framework.ChannelHandler; +import com.iluwatar.reactor.framework.Dispatcher; +import com.iluwatar.reactor.framework.NioDatagramChannel; +import com.iluwatar.reactor.framework.NioReactor; +import com.iluwatar.reactor.framework.NioServerSocketChannel; +import com.iluwatar.reactor.framework.ThreadPoolDispatcher; + +/** + * This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging + * Service where it listens on multiple TCP or UDP sockets for incoming log requests. + * + *

+ * INTENT
+ * The Reactor design pattern handles service requests that are delivered concurrently to an + * application by one or more clients. The application can register specific handlers for processing + * which are called by reactor on specific events. + * + *

+ * PROBLEM
+ * Server applications in a distributed system must handle multiple clients that send them service + * requests. Following forces need to be resolved: + *

+ * + *

+ * PARTICIPANTS
+ *

+ * + *

+ * The application utilizes single thread to listen for requests on all ports. It does not create a + * separate thread for each client, which provides better scalability under load (number of clients + * increase). + * + *

+ * The example uses Java NIO framework to implement the Reactor. + * + */ +public class App { + + private NioReactor reactor; + private List channels = new ArrayList<>(); + + /** + * App entry. + * + * @throws IOException + */ + public static void main(String[] args) throws IOException { + new App().start(new ThreadPoolDispatcher(2)); + } + + /** + * Starts the NIO reactor. + * @param threadPoolDispatcher + * + * @throws IOException if any channel fails to bind. + */ + public void start(Dispatcher dispatcher) throws IOException { + /* + * The application can customize its event dispatching mechanism. + */ + reactor = new NioReactor(dispatcher); + + /* + * This represents application specific business logic that dispatcher will call on appropriate + * events. These events are read events in our example. + */ + LoggingHandler loggingHandler = new LoggingHandler(); + + /* + * Our application binds to multiple channels and uses same logging handler to handle incoming + * log requests. + */ + reactor.registerChannel(tcpChannel(6666, loggingHandler)).registerChannel(tcpChannel(6667, loggingHandler)) + .registerChannel(udpChannel(6668, loggingHandler)).start(); + } + + /** + * Stops the NIO reactor. This is a blocking call. + * + * @throws InterruptedException if interrupted while stopping the reactor. + * @throws IOException if any I/O error occurs + */ + public void stop() throws InterruptedException, IOException { + reactor.stop(); + for (AbstractNioChannel channel : channels) { + channel.getChannel().close(); + } + } + + private AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { + NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); + channel.bind(); + channels.add(channel); + return channel; + } + + private AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { + NioDatagramChannel channel = new NioDatagramChannel(port, handler); + channel.bind(); + channels.add(channel); + return channel; + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java new file mode 100644 index 000000000..659f5da21 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java @@ -0,0 +1,163 @@ +package com.iluwatar.reactor.app; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging + * requests to Reactor. + */ +public class AppClient { + private final ExecutorService service = Executors.newFixedThreadPool(4); + + /** + * App client entry. + * + * @throws IOException if any I/O error occurs. + */ + public static void main(String[] args) throws IOException { + AppClient appClient = new AppClient(); + appClient.start(); + } + + /** + * Starts the logging clients. + * + * @throws IOException if any I/O error occurs. + */ + public void start() throws IOException { + service.execute(new TCPLoggingClient("Client 1", 6666)); + service.execute(new TCPLoggingClient("Client 2", 6667)); + service.execute(new UDPLoggingClient("Client 3", 6668)); + service.execute(new UDPLoggingClient("Client 4", 6668)); + } + + /** + * Stops logging clients. This is a blocking call. + */ + public void stop() { + service.shutdown(); + if (!service.isTerminated()) { + service.shutdownNow(); + try { + service.awaitTermination(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + private static void artificialDelayOf(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * A logging client that sends requests to Reactor on TCP socket. + */ + static class TCPLoggingClient implements Runnable { + + private final int serverPort; + private final String clientName; + + /** + * Creates a new TCP logging client. + * + * @param clientName the name of the client to be sent in logging requests. + * @param port the port on which client will send logging requests. + */ + public TCPLoggingClient(String clientName, int serverPort) { + this.clientName = clientName; + this.serverPort = serverPort; + } + + public void run() { + try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) { + OutputStream outputStream = socket.getOutputStream(); + PrintWriter writer = new PrintWriter(outputStream); + sendLogRequests(writer, socket.getInputStream()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException { + for (int i = 0; i < 4; i++) { + writer.println(clientName + " - Log request: " + i); + writer.flush(); + + byte[] data = new byte[1024]; + int read = inputStream.read(data, 0, data.length); + if (read == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(data, 0, read)); + } + + artificialDelayOf(100); + } + } + + } + + /** + * A logging client that sends requests to Reactor on UDP socket. + */ + static class UDPLoggingClient implements Runnable { + private final String clientName; + private final InetSocketAddress remoteAddress; + + /** + * Creates a new UDP logging client. + * + * @param clientName the name of the client to be sent in logging requests. + * @param port the port on which client will send logging requests. + * @throws UnknownHostException if localhost is unknown + */ + public UDPLoggingClient(String clientName, int port) throws UnknownHostException { + this.clientName = clientName; + this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); + } + + @Override + public void run() { + try (DatagramSocket socket = new DatagramSocket()) { + for (int i = 0; i < 4; i++) { + + String message = clientName + " - Log request: " + i; + DatagramPacket request = new DatagramPacket(message.getBytes(), message.getBytes().length, remoteAddress); + + socket.send(request); + + byte[] data = new byte[1024]; + DatagramPacket reply = new DatagramPacket(data, data.length); + socket.receive(reply); + if (reply.getLength() == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(reply.getData(), 0, reply.getLength())); + } + + artificialDelayOf(100); + } + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java new file mode 100644 index 000000000..0845303df --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java @@ -0,0 +1,59 @@ +package com.iluwatar.reactor.app; + +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; + +import com.iluwatar.reactor.framework.AbstractNioChannel; +import com.iluwatar.reactor.framework.ChannelHandler; +import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket; + +/** + * Logging server application logic. It logs the incoming requests on standard console and returns a + * canned acknowledgement back to the remote peer. + */ +public class LoggingHandler implements ChannelHandler { + + private static final byte[] ACK = "Data logged successfully".getBytes(); + + /** + * Decodes the received data and logs it on standard console. + */ + @Override + public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { + /* + * As this handler is attached with both TCP and UDP channels we need to check whether the data + * received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel). + */ + if (readObject instanceof ByteBuffer) { + doLogging(((ByteBuffer) readObject)); + sendReply(channel, key); + } else if (readObject instanceof DatagramPacket) { + DatagramPacket datagram = (DatagramPacket) readObject; + doLogging(datagram.getData()); + sendReply(channel, datagram, key); + } else { + throw new IllegalStateException("Unknown data received"); + } + } + + private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) { + /* + * Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming + * message. + */ + DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK)); + replyPacket.setReceiver(incomingPacket.getSender()); + + channel.write(replyPacket, key); + } + + private void sendReply(AbstractNioChannel channel, SelectionKey key) { + ByteBuffer buffer = ByteBuffer.wrap(ACK); + channel.write(buffer, key); + } + + private void doLogging(ByteBuffer data) { + // assuming UTF-8 :( + System.out.println(new String(data.array(), 0, data.limit())); + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java new file mode 100644 index 000000000..4abacd86f --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java @@ -0,0 +1,151 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This represents the Handle of Reactor pattern. These are resources managed by OS which can + * be submitted to {@link NioReactor}. + * + *

+ * This class serves has the responsibility of reading the data when a read event occurs and writing + * the data back when the channel is writable. It leaves the reading and writing of data on the + * concrete implementation. It provides a block writing mechanism wherein when any + * {@link ChannelHandler} wants to write data back, it queues the data in pending write queue and + * clears it in block manner. This provides better throughput. + */ +public abstract class AbstractNioChannel { + + private final SelectableChannel channel; + private final ChannelHandler handler; + private final Map> channelToPendingWrites = new ConcurrentHashMap<>(); + private NioReactor reactor; + + /** + * Creates a new channel. + * + * @param handler which will handle events occurring on this channel. + * @param channel a NIO channel to be wrapped. + */ + public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { + this.handler = handler; + this.channel = channel; + } + + /** + * Injects the reactor in this channel. + */ + void setReactor(NioReactor reactor) { + this.reactor = reactor; + } + + /** + * @return the wrapped NIO channel. + */ + public SelectableChannel getChannel() { + return channel; + } + + /** + * The operation in which the channel is interested, this operation is provided to + * {@link Selector}. + * + * @return interested operation. + * @see SelectionKey + */ + public abstract int getInterestedOps(); + + /** + * Binds the channel on provided port. + * + * @throws IOException if any I/O error occurs. + */ + public abstract void bind() throws IOException; + + /** + * Reads the data using the key and returns the read data. The underlying channel should be + * fetched using {@link SelectionKey#channel()}. + * + * @param key the key on which read event occurred. + * @return data read. + * @throws IOException if any I/O error occurs. + */ + public abstract Object read(SelectionKey key) throws IOException; + + /** + * @return the handler associated with this channel. + */ + public ChannelHandler getHandler() { + return handler; + } + + /* + * Called from the context of reactor thread when the key becomes writable. The channel writes the + * whole pending block of data at once. + */ + void flush(SelectionKey key) throws IOException { + Queue pendingWrites = channelToPendingWrites.get(key.channel()); + while (true) { + Object pendingWrite = pendingWrites.poll(); + if (pendingWrite == null) { + // We don't have anything more to write so channel is interested in reading more data + reactor.changeOps(key, SelectionKey.OP_READ); + break; + } + + // ask the concrete channel to make sense of data and write it to java channel + doWrite(pendingWrite, key); + } + } + + /** + * Writes the data to the channel. + * + * @param pendingWrite the data to be written on channel. + * @param key the key which is writable. + * @throws IOException if any I/O error occurs. + */ + protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException; + + /** + * Queues the data for writing. The data is not guaranteed to be written on underlying channel + * when this method returns. It will be written when the channel is flushed. + * + *

+ * This method is used by the {@link ChannelHandler} to send reply back to the client.
+ * Example: + * + *

+   * 
+   * {@literal @}Override
+   * public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
+   *   byte[] data = ((ByteBuffer)readObject).array();
+   *   ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
+   *   channel.write(buffer, key);
+   * }
+   * 
+   * 
+   * @param data the data to be written on underlying channel.
+   * @param key the key which is writable.
+   */
+  public void write(Object data, SelectionKey key) {
+    Queue pendingWrites = this.channelToPendingWrites.get(key.channel());
+    if (pendingWrites == null) {
+      synchronized (this.channelToPendingWrites) {
+        pendingWrites = this.channelToPendingWrites.get(key.channel());
+        if (pendingWrites == null) {
+          pendingWrites = new ConcurrentLinkedQueue<>();
+          this.channelToPendingWrites.put(key.channel(), pendingWrites);
+        }
+      }
+    }
+    pendingWrites.add(data);
+    reactor.changeOps(key, SelectionKey.OP_WRITE);
+  }
+}
diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
new file mode 100644
index 000000000..381738ecd
--- /dev/null
+++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
@@ -0,0 +1,23 @@
+package com.iluwatar.reactor.framework;
+
+import java.nio.channels.SelectionKey;
+
+/**
+ * Represents the EventHandler of Reactor pattern. It handles the incoming events dispatched
+ * to it by the {@link Dispatcher}. This is where the application logic resides.
+ * 
+ * 

+ * A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and + * whenever an event occurs on any of the associated channels, the handler is notified of the event. + */ +public interface ChannelHandler { + + /** + * Called when the {@code channel} receives some data from remote peer. + * + * @param channel the channel from which the data was received. + * @param readObject the data read. + * @param key the key on which read event occurred. + */ + void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java new file mode 100644 index 000000000..78aeb84df --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java @@ -0,0 +1,41 @@ +package com.iluwatar.reactor.framework; + +import java.nio.channels.SelectionKey; + +/** + * Represents the event dispatching strategy. When {@link NioReactor} senses any event on the + * registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write or + * connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the + * I/O processing from application specific processing.
+ * Dispatcher should call the {@link ChannelHandler} associated with the channel on which event + * occurred. + * + *

+ * The application can customize the way in which event is dispatched such as using the reactor + * thread to dispatch event to channels or use a worker pool to do the non I/O processing. + * + * @see SameThreadDispatcher + * @see ThreadPoolDispatcher + */ +public interface Dispatcher { + /** + * This hook method is called when read event occurs on particular channel. The data read is + * provided in readObject. The implementation should dispatch this read event to the + * associated {@link ChannelHandler} of channel. + * + *

+ * The type of readObject depends on the channel on which data was received. + * + * @param channel on which read event occurred + * @param readObject object read by channel + * @param key on which event occurred + */ + void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); + + /** + * Stops dispatching events and cleans up any acquired resources such as threads. + * + * @throws InterruptedException if interrupted while stopping dispatcher. + */ + void stop() throws InterruptedException; +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java new file mode 100644 index 000000000..b55480ffc --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java @@ -0,0 +1,159 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.nio.channels.SelectionKey; + +/** + * A wrapper over {@link DatagramChannel} which can read and write data on a DatagramChannel. + */ +public class NioDatagramChannel extends AbstractNioChannel { + + private final int port; + + /** + * Creates a {@link DatagramChannel} which will bind at provided port and use handler + * to handle incoming events on this channel. + *

+ * Note the constructor does not bind the socket, {@link #bind()} method should be called for + * binding the socket. + * + * @param port the port to be bound to listen for incoming datagram requests. + * @param handler the handler to be used for handling incoming requests on this channel. + * @throws IOException if any I/O error occurs. + */ + public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { + super(handler, DatagramChannel.open()); + this.port = port; + } + + @Override + public int getInterestedOps() { + /* + * there is no need to accept connections in UDP, so the channel shows interest in reading data. + */ + return SelectionKey.OP_READ; + } + + /** + * Reads and returns a {@link DatagramPacket} from the underlying channel. + * + * @return the datagram packet read having the sender address. + */ + @Override + public DatagramPacket read(SelectionKey key) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1024); + SocketAddress sender = ((DatagramChannel) key.channel()).receive(buffer); + + /* + * It is required to create a DatagramPacket because we need to preserve which socket address + * acts as destination for sending reply packets. + */ + buffer.flip(); + DatagramPacket packet = new DatagramPacket(buffer); + packet.setSender(sender); + + return packet; + } + + /** + * @return the underlying datagram channel. + */ + @Override + public DatagramChannel getChannel() { + return (DatagramChannel) super.getChannel(); + } + + /** + * Binds UDP socket on the provided port. + * + * @throws IOException if any I/O error occurs. + */ + @Override + public void bind() throws IOException { + getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + getChannel().configureBlocking(false); + System.out.println("Bound UDP socket at port: " + port); + } + + /** + * Writes the pending {@link DatagramPacket} to the underlying channel sending data to the + * intended receiver of the packet. + */ + @Override + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; + getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); + } + + /** + * Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the + * datagram packet must be set in the data using + * {@link DatagramPacket#setReceiver(SocketAddress)}. + */ + @Override + public void write(Object data, SelectionKey key) { + super.write(data, key); + } + + /** + * Container of data used for {@link NioDatagramChannel} to communicate with remote peer. + */ + public static class DatagramPacket { + private SocketAddress sender; + private ByteBuffer data; + private SocketAddress receiver; + + /** + * Creates a container with underlying data. + * + * @param data the underlying message to be written on channel. + */ + public DatagramPacket(ByteBuffer data) { + this.data = data; + } + + /** + * @return the sender address. + */ + public SocketAddress getSender() { + return sender; + } + + /** + * Sets the sender address of this packet. + * + * @param sender the sender address. + */ + public void setSender(SocketAddress sender) { + this.sender = sender; + } + + /** + * @return the receiver address. + */ + public SocketAddress getReceiver() { + return receiver; + } + + /** + * Sets the intended receiver address. This must be set when writing to the channel. + * + * @param receiver the receiver address. + */ + public void setReceiver(SocketAddress receiver) { + this.receiver = receiver; + } + + /** + * @return the underlying message that will be written on channel. + */ + public ByteBuffer getData() { + return data; + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java new file mode 100644 index 000000000..b818612e5 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java @@ -0,0 +1,233 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern. + * Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks + * for events from all these handles. Whenever an event occurs on any of the registered handles, it + * synchronously de-multiplexes the event which can be any of read, write or accept, and dispatches + * the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}. + * + *

+ * Implementation: A NIO reactor runs in its own thread when it is started using {@link #start()} + * method. {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing. + * + *

+ * NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible + * edge cases which are required in a real application. This implementation is meant to demonstrate + * the fundamental concepts that lie behind Reactor pattern. + */ +public class NioReactor { + + private final Selector selector; + private final Dispatcher dispatcher; + /** + * All the work of altering the SelectionKey operations and Selector operations are performed in + * the context of main event loop of reactor. So when any channel needs to change its readability + * or writability, a new command is added in the command queue and then the event loop picks up + * the command and executes it in next iteration. + */ + private final Queue pendingCommands = new ConcurrentLinkedQueue<>(); + private final ExecutorService reactorMain = Executors.newSingleThreadExecutor(); + + /** + * Creates a reactor which will use provided {@code dispatcher} to dispatch events. The + * application can provide various implementations of dispatcher which suits its needs. + * + * @param dispatcher a non-null dispatcher used to dispatch events on registered channels. + * @throws IOException if any I/O error occurs. + */ + public NioReactor(Dispatcher dispatcher) throws IOException { + this.dispatcher = dispatcher; + this.selector = Selector.open(); + } + + /** + * Starts the reactor event loop in a new thread. + * + * @throws IOException if any I/O error occurs. + */ + public void start() throws IOException { + reactorMain.execute(() -> { + try { + System.out.println("Reactor started, waiting for events..."); + eventLoop(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + + /** + * Stops the reactor and related resources such as dispatcher. + * + * @throws InterruptedException if interrupted while stopping the reactor. + */ + public void stop() throws InterruptedException { + reactorMain.shutdownNow(); + selector.wakeup(); + reactorMain.awaitTermination(4, TimeUnit.SECONDS); + dispatcher.stop(); + } + + /** + * Registers a new channel (handle) with this reactor. Reactor will start waiting for events on + * this channel and notify of any events. While registering the channel the reactor uses + * {@link AbstractNioChannel#getInterestedOps()} to know about the interested operation of this + * channel. + * + * @param channel a new channel on which reactor will wait for events. The channel must be bound + * prior to being registered. + * @return this + * @throws IOException if any I/O error occurs. + */ + public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { + SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); + key.attach(channel); + channel.setReactor(this); + return this; + } + + private void eventLoop() throws IOException { + while (true) { + + // honor interrupt request + if (Thread.interrupted()) { + break; + } + + // honor any pending commands first + processPendingCommands(); + + /* + * Synchronous event de-multiplexing happens here, this is blocking call which returns when it + * is possible to initiate non-blocking operation on any of the registered channels. + */ + selector.select(); + + /* + * Represents the events that have occurred on registered handles. + */ + Set keys = selector.selectedKeys(); + + Iterator iterator = keys.iterator(); + + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + if (!key.isValid()) { + iterator.remove(); + continue; + } + processKey(key); + } + keys.clear(); + } + } + + private void processPendingCommands() { + Iterator iterator = pendingCommands.iterator(); + while (iterator.hasNext()) { + Runnable command = iterator.next(); + command.run(); + iterator.remove(); + } + } + + /* + * Initiation dispatcher logic, it checks the type of event and notifier application specific + * event handler to handle the event. + */ + private void processKey(SelectionKey key) throws IOException { + if (key.isAcceptable()) { + onChannelAcceptable(key); + } else if (key.isReadable()) { + onChannelReadable(key); + } else if (key.isWritable()) { + onChannelWritable(key); + } + } + + private void onChannelWritable(SelectionKey key) throws IOException { + AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); + channel.flush(key); + } + + private void onChannelReadable(SelectionKey key) { + try { + // reads the incoming data in context of reactor main loop. Can this be improved? + Object readObject = ((AbstractNioChannel) key.attachment()).read(key); + + dispatchReadEvent(key, readObject); + } catch (IOException e) { + try { + key.channel().close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } + + /* + * Uses the application provided dispatcher to dispatch events to application handler. + */ + private void dispatchReadEvent(SelectionKey key, Object readObject) { + dispatcher.onChannelReadEvent((AbstractNioChannel) key.attachment(), readObject, key); + } + + private void onChannelAcceptable(SelectionKey key) throws IOException { + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + readKey.attach(key.attachment()); + } + + /** + * Queues the change of operations request of a channel, which will change the interested + * operations of the channel sometime in future. + *

+ * This is a non-blocking method and does not guarantee that the operations have changed when this + * method returns. + * + * @param key the key for which operations have to be changed. + * @param interestedOps the new interest operations. + */ + public void changeOps(SelectionKey key, int interestedOps) { + pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps)); + selector.wakeup(); + } + + /** + * A command that changes the interested operations of the key provided. + */ + class ChangeKeyOpsCommand implements Runnable { + private SelectionKey key; + private int interestedOps; + + public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) { + this.key = key; + this.interestedOps = interestedOps; + } + + public void run() { + key.interestOps(interestedOps); + } + + @Override + public String toString() { + return "Change of ops to: " + interestedOps; + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java new file mode 100644 index 000000000..c635a6076 --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java @@ -0,0 +1,88 @@ +package com.iluwatar.reactor.framework; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * A wrapper over {@link NioServerSocketChannel} which can read and write data on a + * {@link SocketChannel}. + */ +public class NioServerSocketChannel extends AbstractNioChannel { + + private final int port; + + /** + * Creates a {@link ServerSocketChannel} which will bind at provided port and use + * handler to handle incoming events on this channel. + *

+ * Note the constructor does not bind the socket, {@link #bind()} method should be called for + * binding the socket. + * + * @param port the port on which channel will be bound to accept incoming connection requests. + * @param handler the handler that will handle incoming requests on this channel. + * @throws IOException if any I/O error occurs. + */ + public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { + super(handler, ServerSocketChannel.open()); + this.port = port; + } + + + @Override + public int getInterestedOps() { + // being a server socket channel it is interested in accepting connection from remote peers. + return SelectionKey.OP_ACCEPT; + } + + /** + * @return the underlying {@link ServerSocketChannel}. + */ + @Override + public ServerSocketChannel getChannel() { + return (ServerSocketChannel) super.getChannel(); + } + + /** + * Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by + * the key. Due to the fact that there is a dedicated channel for each client + * connection we don't need to store the sender. + */ + @Override + public ByteBuffer read(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int read = socketChannel.read(buffer); + buffer.flip(); + if (read == -1) { + throw new IOException("Socket closed"); + } + return buffer; + } + + /** + * Binds TCP socket on the provided port. + * + * @throws IOException if any I/O error occurs. + */ + @Override + public void bind() throws IOException { + ((ServerSocketChannel) getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + ((ServerSocketChannel) getChannel()).configureBlocking(false); + System.out.println("Bound TCP socket at port: " + port); + } + + /** + * Writes the pending {@link ByteBuffer} to the underlying channel sending data to the intended + * receiver of the packet. + */ + @Override + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; + ((SocketChannel) key.channel()).write(pendingBuffer); + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java new file mode 100644 index 000000000..ae995428e --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java @@ -0,0 +1,38 @@ +package com.iluwatar.reactor.framework; + +import java.nio.channels.SelectionKey; + +/** + * Dispatches the events in the context of caller thread. This implementation is a good fit for + * small applications where there are limited clients. Using this implementation limits the + * scalability because the I/O thread performs the application specific processing. + * + *

+ * For better performance use {@link ThreadPoolDispatcher}. + * + * @see ThreadPoolDispatcher + */ +public class SameThreadDispatcher implements Dispatcher { + + /** + * Dispatches the read event in the context of caller thread.
+ * Note this is a blocking call. It returns only after the associated handler has handled the read + * event. + */ + @Override + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { + /* + * Calls the associated handler to notify the read event where application specific code + * resides. + */ + channel.getHandler().handleChannelRead(channel, readObject, key); + } + + /** + * No resources to free. + */ + @Override + public void stop() { + // no-op + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java new file mode 100644 index 000000000..4a240659e --- /dev/null +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java @@ -0,0 +1,47 @@ +package com.iluwatar.reactor.framework; + +import java.nio.channels.SelectionKey; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * An implementation that uses a pool of worker threads to dispatch the events. This provides better + * scalability as the application specific processing is not performed in the context of I/O + * (reactor) thread. + */ +public class ThreadPoolDispatcher implements Dispatcher { + + private final ExecutorService executorService; + + /** + * Creates a pooled dispatcher with tunable pool size. + * + * @param poolSize number of pooled threads + */ + public ThreadPoolDispatcher(int poolSize) { + this.executorService = Executors.newFixedThreadPool(poolSize); + } + + /** + * Submits the work of dispatching the read event to worker pool, where it gets picked up by + * worker threads.
+ * Note that this is a non-blocking call and returns immediately. It is not guaranteed that the + * event has been handled by associated handler. + */ + @Override + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { + executorService.execute(() -> channel.getHandler().handleChannelRead(channel, readObject, key)); + } + + /** + * Stops the pool of workers. + * + * @throws InterruptedException if interrupted while stopping pool of workers. + */ + @Override + public void stop() throws InterruptedException { + executorService.shutdown(); + executorService.awaitTermination(4, TimeUnit.SECONDS); + } +} diff --git a/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java new file mode 100644 index 000000000..752192ef3 --- /dev/null +++ b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java @@ -0,0 +1,68 @@ +package com.iluwatar.reactor.app; + +import java.io.IOException; + +import org.junit.Test; + +import com.iluwatar.reactor.framework.SameThreadDispatcher; +import com.iluwatar.reactor.framework.ThreadPoolDispatcher; + +/** + * + * This class tests the Distributed Logging service by starting a Reactor and then sending it + * concurrent logging requests using multiple clients. + */ +public class AppTest { + + /** + * Test the application using pooled thread dispatcher. + * + * @throws IOException if any I/O error occurs. + * @throws InterruptedException if interrupted while stopping the application. + */ + @Test + public void testAppUsingThreadPoolDispatcher() throws IOException, InterruptedException { + App app = new App(); + app.start(new ThreadPoolDispatcher(2)); + + AppClient client = new AppClient(); + client.start(); + + // allow clients to send requests. Artificial delay. + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + client.stop(); + + app.stop(); + } + + /** + * Test the application using same thread dispatcher. + * + * @throws IOException if any I/O error occurs. + * @throws InterruptedException if interrupted while stopping the application. + */ + @Test + public void testAppUsingSameThreadDispatcher() throws IOException, InterruptedException { + App app = new App(); + app.start(new SameThreadDispatcher()); + + AppClient client = new AppClient(); + client.start(); + + // allow clients to send requests. Artificial delay. + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + client.stop(); + + app.stop(); + } +}