diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/App.java b/reactor/src/main/java/com/iluwatar/reactor/app/App.java index 947173494..fcc327b34 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/App.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/App.java @@ -10,20 +10,18 @@ import com.iluwatar.reactor.framework.NioServerSocketChannel; import com.iluwatar.reactor.framework.ThreadPoolDispatcher; /** - * This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging Service - * where it listens on multiple TCP or UDP sockets for incoming log requests. + * This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging + * Service where it listens on multiple TCP or UDP sockets for incoming log requests. * *

- * INTENT - *
- * The Reactor design pattern handles service requests that are delivered concurrently to an + * INTENT
+ * The Reactor design pattern handles service requests that are delivered concurrently to an * application by one or more clients. The application can register specific handlers for processing * which are called by reactor on specific events. * *

- * PROBLEM - *
- * Server applications in a distributed system must handle multiple clients that send them service + * PROBLEM
+ * Server applications in a distributed system must handle multiple clients that send them service * requests. Following forces need to be resolved: *

* *

- * The application utilizes single thread to listen for requests on all ports. It does not create - * a separate thread for each client, which provides better scalability under load (number of clients + * PARTICIPANTS
+ *

+ * + *

+ * The application utilizes single thread to listen for requests on all ports. It does not create a + * separate thread for each client, which provides better scalability under load (number of clients * increase). * *

@@ -45,59 +63,60 @@ import com.iluwatar.reactor.framework.ThreadPoolDispatcher; */ public class App { - private NioReactor reactor; + private NioReactor reactor; - /** - * App entry. - * @throws IOException - */ - public static void main(String[] args) throws IOException { - new App().start(); - } - - /** - * Starts the NIO reactor. - * @throws IOException if any channel fails to bind. - */ - public void start() throws IOException { - /* - * The application can customize its event dispatching mechanism. - */ - reactor = new NioReactor(new ThreadPoolDispatcher(2)); - - /* - * This represents application specific business logic that dispatcher will call - * on appropriate events. These events are read events in our example. - */ - LoggingHandler loggingHandler = new LoggingHandler(); - - /* - * Our application binds to multiple channels and uses same logging handler to handle - * incoming log requests. - */ - reactor - .registerChannel(tcpChannel(6666, loggingHandler)) - .registerChannel(tcpChannel(6667, loggingHandler)) - .registerChannel(udpChannel(6668, loggingHandler)) - .start(); - } - - /** - * Stops the NIO reactor. This is a blocking call. - */ - public void stop() { - reactor.stop(); - } + /** + * App entry. + * + * @throws IOException + */ + public static void main(String[] args) throws IOException { + new App().start(); + } - private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { - NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); - channel.bind(); - return channel; - } - - private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { - NioDatagramChannel channel = new NioDatagramChannel(port, handler); - channel.bind(); - return channel; - } + /** + * Starts the NIO reactor. + * + * @throws IOException if any channel fails to bind. + */ + public void start() throws IOException { + /* + * The application can customize its event dispatching mechanism. + */ + reactor = new NioReactor(new ThreadPoolDispatcher(2)); + + /* + * This represents application specific business logic that dispatcher will call on appropriate + * events. These events are read events in our example. + */ + LoggingHandler loggingHandler = new LoggingHandler(); + + /* + * Our application binds to multiple channels and uses same logging handler to handle incoming + * log requests. + */ + reactor.registerChannel(tcpChannel(6666, loggingHandler)).registerChannel(tcpChannel(6667, loggingHandler)) + .registerChannel(udpChannel(6668, loggingHandler)).start(); + } + + /** + * Stops the NIO reactor. This is a blocking call. + * + * @throws InterruptedException if interrupted while stopping the reactor. + */ + public void stop() throws InterruptedException { + reactor.stop(); + } + + private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { + NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); + channel.bind(); + return channel; + } + + private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { + NioDatagramChannel channel = new NioDatagramChannel(port, handler); + channel.bind(); + return channel; + } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java index 033711569..c50e4d3e7 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java @@ -17,148 +17,149 @@ import java.util.concurrent.TimeUnit; /** * Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging * requests to Reactor. - * + * * @author npathai */ public class AppClient { - private ExecutorService service = Executors.newFixedThreadPool(4); + private final ExecutorService service = Executors.newFixedThreadPool(4); - /** - * App client entry. - * @throws IOException if any I/O error occurs. - */ - public static void main(String[] args) throws IOException { - AppClient appClient = new AppClient(); - appClient.start(); - } + /** + * App client entry. + * + * @throws IOException if any I/O error occurs. + */ + public static void main(String[] args) throws IOException { + AppClient appClient = new AppClient(); + appClient.start(); + } - /** - * Starts the logging clients. - * @throws IOException if any I/O error occurs. - */ - public void start() throws IOException { - service.execute(new TCPLoggingClient("Client 1", 6666)); - service.execute(new TCPLoggingClient("Client 2", 6667)); - service.execute(new UDPLoggingClient("Client 3", 6668)); - service.execute(new UDPLoggingClient("Client 4", 6668)); - } + /** + * Starts the logging clients. + * + * @throws IOException if any I/O error occurs. + */ + public void start() throws IOException { + service.execute(new TCPLoggingClient("Client 1", 6666)); + service.execute(new TCPLoggingClient("Client 2", 6667)); + service.execute(new UDPLoggingClient("Client 3", 6668)); + service.execute(new UDPLoggingClient("Client 4", 6668)); + } - /** - * Stops logging clients. This is a blocking call. - */ - public void stop() { - service.shutdown(); - if (!service.isTerminated()) { - service.shutdownNow(); - try { - service.awaitTermination(1000, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - } - - private static void artificialDelayOf(long millis) { - try { - Thread.sleep(millis); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + /** + * Stops logging clients. This is a blocking call. + */ + public void stop() { + service.shutdown(); + if (!service.isTerminated()) { + service.shutdownNow(); + try { + service.awaitTermination(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } - /** - * A logging client that sends requests to Reactor on TCP socket. - */ - static class TCPLoggingClient implements Runnable { + private static void artificialDelayOf(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } - private int serverPort; - private String clientName; + /** + * A logging client that sends requests to Reactor on TCP socket. + */ + static class TCPLoggingClient implements Runnable { - /** - * Creates a new TCP logging client. - * - * @param clientName the name of the client to be sent in logging requests. - * @param port the port on which client will send logging requests. - */ - public TCPLoggingClient(String clientName, int serverPort) { - this.clientName = clientName; - this.serverPort = serverPort; - } + private final int serverPort; + private final String clientName; - public void run() { - try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) { - OutputStream outputStream = socket.getOutputStream(); - PrintWriter writer = new PrintWriter(outputStream); - sendLogRequests(writer, socket.getInputStream()); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } + /** + * Creates a new TCP logging client. + * + * @param clientName the name of the client to be sent in logging requests. + * @param port the port on which client will send logging requests. + */ + public TCPLoggingClient(String clientName, int serverPort) { + this.clientName = clientName; + this.serverPort = serverPort; + } - private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException { - for (int i = 0; i < 4; i++) { - writer.println(clientName + " - Log request: " + i); - writer.flush(); + public void run() { + try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) { + OutputStream outputStream = socket.getOutputStream(); + PrintWriter writer = new PrintWriter(outputStream); + sendLogRequests(writer, socket.getInputStream()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } - byte[] data = new byte[1024]; - int read = inputStream.read(data, 0, data.length); - if (read == 0) { - System.out.println("Read zero bytes"); - } else { - System.out.println(new String(data, 0, read)); - } + private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException { + for (int i = 0; i < 4; i++) { + writer.println(clientName + " - Log request: " + i); + writer.flush(); - artificialDelayOf(100); - } - } + byte[] data = new byte[1024]; + int read = inputStream.read(data, 0, data.length); + if (read == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(data, 0, read)); + } - } + artificialDelayOf(100); + } + } - /** - * A logging client that sends requests to Reactor on UDP socket. - */ - static class UDPLoggingClient implements Runnable { - private String clientName; - private InetSocketAddress remoteAddress; + } - /** - * Creates a new UDP logging client. - * - * @param clientName the name of the client to be sent in logging requests. - * @param port the port on which client will send logging requests. - * @throws UnknownHostException if localhost is unknown - */ - public UDPLoggingClient(String clientName, int port) throws UnknownHostException { - this.clientName = clientName; - this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); - } + /** + * A logging client that sends requests to Reactor on UDP socket. + */ + static class UDPLoggingClient implements Runnable { + private final String clientName; + private final InetSocketAddress remoteAddress; - @Override - public void run() { - try (DatagramSocket socket = new DatagramSocket()) { - for (int i = 0; i < 4; i++) { - - String message = clientName + " - Log request: " + i; - DatagramPacket request = new DatagramPacket(message.getBytes(), - message.getBytes().length, remoteAddress); - - socket.send(request); + /** + * Creates a new UDP logging client. + * + * @param clientName the name of the client to be sent in logging requests. + * @param port the port on which client will send logging requests. + * @throws UnknownHostException if localhost is unknown + */ + public UDPLoggingClient(String clientName, int port) throws UnknownHostException { + this.clientName = clientName; + this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); + } - byte[] data = new byte[1024]; - DatagramPacket reply = new DatagramPacket(data, data.length); - socket.receive(reply); - if (reply.getLength() == 0) { - System.out.println("Read zero bytes"); - } else { - System.out.println(new String(reply.getData(), 0, reply.getLength())); - } - - artificialDelayOf(100); - } - } catch (IOException e1) { - e1.printStackTrace(); - } - } - } + @Override + public void run() { + try (DatagramSocket socket = new DatagramSocket()) { + for (int i = 0; i < 4; i++) { + + String message = clientName + " - Log request: " + i; + DatagramPacket request = new DatagramPacket(message.getBytes(), message.getBytes().length, remoteAddress); + + socket.send(request); + + byte[] data = new byte[1024]; + DatagramPacket reply = new DatagramPacket(data, data.length); + socket.receive(reply); + if (reply.getLength() == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(reply.getData(), 0, reply.getLength())); + } + + artificialDelayOf(100); + } + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java index eed26b078..1f2694b0b 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java @@ -8,53 +8,54 @@ import com.iluwatar.reactor.framework.ChannelHandler; import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket; /** - * Logging server application logic. It logs the incoming requests on standard console and returns - * a canned acknowledgement back to the remote peer. + * Logging server application logic. It logs the incoming requests on standard console and returns a + * canned acknowledgement back to the remote peer. * * @author npathai */ public class LoggingHandler implements ChannelHandler { - private static final byte[] ACK = "Data logged successfully".getBytes(); + private static final byte[] ACK = "Data logged successfully".getBytes(); - /** - * Decodes the received data and logs it on standard console. - */ - @Override - public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { - /* - * As this handler is attached with both TCP and UDP channels we need to check whether - * the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel). - */ - if (readObject instanceof ByteBuffer) { - doLogging(((ByteBuffer)readObject)); - sendReply(channel, key); - } else if (readObject instanceof DatagramPacket) { - DatagramPacket datagram = (DatagramPacket)readObject; - doLogging(datagram.getData()); - sendReply(channel, datagram, key); - } else { - throw new IllegalStateException("Unknown data received"); - } - } + /** + * Decodes the received data and logs it on standard console. + */ + @Override + public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { + /* + * As this handler is attached with both TCP and UDP channels we need to check whether the data + * received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel). + */ + if (readObject instanceof ByteBuffer) { + doLogging(((ByteBuffer) readObject)); + sendReply(channel, key); + } else if (readObject instanceof DatagramPacket) { + DatagramPacket datagram = (DatagramPacket) readObject; + doLogging(datagram.getData()); + sendReply(channel, datagram, key); + } else { + throw new IllegalStateException("Unknown data received"); + } + } - private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) { - /* - * Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming message. - */ - DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK)); - replyPacket.setReceiver(incomingPacket.getSender()); - - channel.write(replyPacket, key); - } + private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) { + /* + * Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming + * message. + */ + DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK)); + replyPacket.setReceiver(incomingPacket.getSender()); - private void sendReply(AbstractNioChannel channel, SelectionKey key) { - ByteBuffer buffer = ByteBuffer.wrap(ACK); - channel.write(buffer, key); - } + channel.write(replyPacket, key); + } - private void doLogging(ByteBuffer data) { - // assuming UTF-8 :( - System.out.println(new String(data.array(), 0, data.limit())); - } + private void sendReply(AbstractNioChannel channel, SelectionKey key) { + ByteBuffer buffer = ByteBuffer.wrap(ACK); + channel.write(buffer, key); + } + + private void doLogging(ByteBuffer data) { + // assuming UTF-8 :( + System.out.println(new String(data.array(), 0, data.limit())); + } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java index 24862644d..09f308731 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java @@ -10,143 +10,145 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; /** - * This represents the Handle of Reactor pattern. These are resources managed by OS - * which can be submitted to {@link NioReactor}. + * This represents the Handle of Reactor pattern. These are resources managed by OS which can + * be submitted to {@link NioReactor}. * *

- * This class serves has the responsibility of reading the data when a read event occurs and - * writing the data back when the channel is writable. It leaves the reading and writing of - * data on the concrete implementation. It provides a block writing mechanism wherein when - * any {@link ChannelHandler} wants to write data back, it queues the data in pending write queue - * and clears it in block manner. This provides better throughput. - * + * This class serves has the responsibility of reading the data when a read event occurs and writing + * the data back when the channel is writable. It leaves the reading and writing of data on the + * concrete implementation. It provides a block writing mechanism wherein when any + * {@link ChannelHandler} wants to write data back, it queues the data in pending write queue and + * clears it in block manner. This provides better throughput. + * * @author npathai * */ public abstract class AbstractNioChannel { - - private SelectableChannel channel; - private ChannelHandler handler; - private Map> channelToPendingWrites = new ConcurrentHashMap<>(); - private NioReactor reactor; - - /** - * Creates a new channel. - * @param handler which will handle events occurring on this channel. - * @param channel a NIO channel to be wrapped. - */ - public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { - this.handler = handler; - this.channel = channel; - } - - /** - * Injects the reactor in this channel. - */ - void setReactor(NioReactor reactor) { - this.reactor = reactor; - } - /** - * @return the wrapped NIO channel. - */ - public SelectableChannel getChannel() { - return channel; - } + private final SelectableChannel channel; + private final ChannelHandler handler; + private final Map> channelToPendingWrites = new ConcurrentHashMap<>(); + private NioReactor reactor; - /** - * The operation in which the channel is interested, this operation is provided to {@link Selector}. - * - * @return interested operation. - * @see SelectionKey - */ - public abstract int getInterestedOps(); - - /** - * Binds the channel on provided port. - * - * @throws IOException if any I/O error occurs. - */ - public abstract void bind() throws IOException; - - /** - * Reads the data using the key and returns the read data. The underlying channel should be fetched using - * {@link SelectionKey#channel()}. - * - * @param key the key on which read event occurred. - * @return data read. - * @throws IOException if any I/O error occurs. - */ - public abstract Object read(SelectionKey key) throws IOException; + /** + * Creates a new channel. + * + * @param handler which will handle events occurring on this channel. + * @param channel a NIO channel to be wrapped. + */ + public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { + this.handler = handler; + this.channel = channel; + } - /** - * @return the handler associated with this channel. - */ - public ChannelHandler getHandler() { - return handler; - } + /** + * Injects the reactor in this channel. + */ + void setReactor(NioReactor reactor) { + this.reactor = reactor; + } - /* - * Called from the context of reactor thread when the key becomes writable. - * The channel writes the whole pending block of data at once. - */ - void flush(SelectionKey key) throws IOException { - Queue pendingWrites = channelToPendingWrites.get(key.channel()); - while (true) { - Object pendingWrite = pendingWrites.poll(); - if (pendingWrite == null) { - // We don't have anything more to write so channel is interested in reading more data - reactor.changeOps(key, SelectionKey.OP_READ); - break; - } - - // ask the concrete channel to make sense of data and write it to java channel - doWrite(pendingWrite, key); - } - } + /** + * @return the wrapped NIO channel. + */ + public SelectableChannel getChannel() { + return channel; + } - /** - * Writes the data to the channel. - * - * @param pendingWrite the data to be written on channel. - * @param key the key which is writable. - * @throws IOException if any I/O error occurs. - */ - protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException; + /** + * The operation in which the channel is interested, this operation is provided to + * {@link Selector}. + * + * @return interested operation. + * @see SelectionKey + */ + public abstract int getInterestedOps(); - /** - * Queues the data for writing. The data is not guaranteed to be written on underlying channel - * when this method returns. It will be written when the channel is flushed. - * - *

- * This method is used by the {@link ChannelHandler} to send reply back to the client. - *
- * Example: - *

-	 * 
-	 * {@literal @}Override
-	 * public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
-	 *   byte[] data = ((ByteBuffer)readObject).array();
-	 *   ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
-	 *   channel.write(buffer, key);
-	 * }
-	 * 
-	 * 
-	 * @param data the data to be written on underlying channel.
-	 * @param key the key which is writable.
-	 */
-	public void write(Object data, SelectionKey key) {
-		Queue pendingWrites = this.channelToPendingWrites.get(key.channel());
-		if (pendingWrites == null) {
-			synchronized (this.channelToPendingWrites) {
-				pendingWrites = this.channelToPendingWrites.get(key.channel());
-				if (pendingWrites == null) {
-					pendingWrites = new ConcurrentLinkedQueue<>();
-					this.channelToPendingWrites.put(key.channel(), pendingWrites);
-				}
-			}
-		}
-		pendingWrites.add(data);
-		reactor.changeOps(key, SelectionKey.OP_WRITE);
-	}
+  /**
+   * Binds the channel on provided port.
+   * 
+   * @throws IOException if any I/O error occurs.
+   */
+  public abstract void bind() throws IOException;
+
+  /**
+   * Reads the data using the key and returns the read data. The underlying channel should be
+   * fetched using {@link SelectionKey#channel()}.
+   * 
+   * @param key the key on which read event occurred.
+   * @return data read.
+   * @throws IOException if any I/O error occurs.
+   */
+  public abstract Object read(SelectionKey key) throws IOException;
+
+  /**
+   * @return the handler associated with this channel.
+   */
+  public ChannelHandler getHandler() {
+    return handler;
+  }
+
+  /*
+   * Called from the context of reactor thread when the key becomes writable. The channel writes the
+   * whole pending block of data at once.
+   */
+  void flush(SelectionKey key) throws IOException {
+    Queue pendingWrites = channelToPendingWrites.get(key.channel());
+    while (true) {
+      Object pendingWrite = pendingWrites.poll();
+      if (pendingWrite == null) {
+        // We don't have anything more to write so channel is interested in reading more data
+        reactor.changeOps(key, SelectionKey.OP_READ);
+        break;
+      }
+
+      // ask the concrete channel to make sense of data and write it to java channel
+      doWrite(pendingWrite, key);
+    }
+  }
+
+  /**
+   * Writes the data to the channel.
+   * 
+   * @param pendingWrite the data to be written on channel.
+   * @param key the key which is writable.
+   * @throws IOException if any I/O error occurs.
+   */
+  protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
+
+  /**
+   * Queues the data for writing. The data is not guaranteed to be written on underlying channel
+   * when this method returns. It will be written when the channel is flushed.
+   * 
+   * 

+ * This method is used by the {@link ChannelHandler} to send reply back to the client.
+ * Example: + * + *

+   * 
+   * {@literal @}Override
+   * public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
+   *   byte[] data = ((ByteBuffer)readObject).array();
+   *   ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
+   *   channel.write(buffer, key);
+   * }
+   * 
+   * 
+   * @param data the data to be written on underlying channel.
+   * @param key the key which is writable.
+   */
+  public void write(Object data, SelectionKey key) {
+    Queue pendingWrites = this.channelToPendingWrites.get(key.channel());
+    if (pendingWrites == null) {
+      synchronized (this.channelToPendingWrites) {
+        pendingWrites = this.channelToPendingWrites.get(key.channel());
+        if (pendingWrites == null) {
+          pendingWrites = new ConcurrentLinkedQueue<>();
+          this.channelToPendingWrites.put(key.channel(), pendingWrites);
+        }
+      }
+    }
+    pendingWrites.add(data);
+    reactor.changeOps(key, SelectionKey.OP_WRITE);
+  }
 }
diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
index 0aae9db75..a4a392a34 100644
--- a/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
+++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java
@@ -7,19 +7,19 @@ import java.nio.channels.SelectionKey;
  * to it by the {@link Dispatcher}. This is where the application logic resides.
  * 
  * 

- * A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and whenever - * an event occurs on any of the associated channels, the handler is notified of the event. - * + * A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and + * whenever an event occurs on any of the associated channels, the handler is notified of the event. + * * @author npathai */ public interface ChannelHandler { - /** - * Called when the {@code channel} receives some data from remote peer. - * - * @param channel the channel from which the data was received. - * @param readObject the data read. - * @param key the key on which read event occurred. - */ - void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); + /** + * Called when the {@code channel} receives some data from remote peer. + * + * @param channel the channel from which the data was received. + * @param readObject the data read. + * @param key the key on which read event occurred. + */ + void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java index c563ef9d3..0ed53f8fc 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java @@ -3,39 +3,41 @@ package com.iluwatar.reactor.framework; import java.nio.channels.SelectionKey; /** - * Represents the event dispatching strategy. When {@link NioReactor} senses any event on the - * registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write - * or connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the I/O - * processing from application specific processing. - *
- * Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred. + * Represents the event dispatching strategy. When {@link NioReactor} senses any event on the + * registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write or + * connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the + * I/O processing from application specific processing.
+ * Dispatcher should call the {@link ChannelHandler} associated with the channel on which event + * occurred. * *

- * The application can customize the way in which event is dispatched such as using the reactor thread to - * dispatch event to channels or use a worker pool to do the non I/O processing. - * + * The application can customize the way in which event is dispatched such as using the reactor + * thread to dispatch event to channels or use a worker pool to do the non I/O processing. + * * @see SameThreadDispatcher * @see ThreadPoolDispatcher * * @author npathai */ public interface Dispatcher { - /** - * This hook method is called when read event occurs on particular channel. The data read - * is provided in readObject. The implementation should dispatch this read event - * to the associated {@link ChannelHandler} of channel. - * - *

- * The type of readObject depends on the channel on which data was received. - * - * @param channel on which read event occurred - * @param readObject object read by channel - * @param key on which event occurred - */ - void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); - - /** - * Stops dispatching events and cleans up any acquired resources such as threads. - */ - void stop(); + /** + * This hook method is called when read event occurs on particular channel. The data read is + * provided in readObject. The implementation should dispatch this read event to the + * associated {@link ChannelHandler} of channel. + * + *

+ * The type of readObject depends on the channel on which data was received. + * + * @param channel on which read event occurred + * @param readObject object read by channel + * @param key on which event occurred + */ + void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); + + /** + * Stops dispatching events and cleans up any acquired resources such as threads. + * + * @throws InterruptedException if interrupted while stopping dispatcher. + */ + void stop() throws InterruptedException; } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java index f338ce4a3..089911d10 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java @@ -15,143 +15,147 @@ import java.nio.channels.SelectionKey; */ public class NioDatagramChannel extends AbstractNioChannel { - private int port; + private final int port; - /** - * Creates a {@link DatagramChannel} which will bind at provided port and use handler to handle - * incoming events on this channel. - *

- * Note the constructor does not bind the socket, {@link #bind()} method should be called for binding - * the socket. - * - * @param port the port to be bound to listen for incoming datagram requests. - * @param handler the handler to be used for handling incoming requests on this channel. - * @throws IOException if any I/O error occurs. - */ - public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { - super(handler, DatagramChannel.open()); - this.port = port; - } + /** + * Creates a {@link DatagramChannel} which will bind at provided port and use handler + * to handle incoming events on this channel. + *

+ * Note the constructor does not bind the socket, {@link #bind()} method should be called for + * binding the socket. + * + * @param port the port to be bound to listen for incoming datagram requests. + * @param handler the handler to be used for handling incoming requests on this channel. + * @throws IOException if any I/O error occurs. + */ + public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { + super(handler, DatagramChannel.open()); + this.port = port; + } - @Override - public int getInterestedOps() { - /* there is no need to accept connections in UDP, so the channel shows interest in - * reading data. - */ - return SelectionKey.OP_READ; - } + @Override + public int getInterestedOps() { + /* + * there is no need to accept connections in UDP, so the channel shows interest in reading data. + */ + return SelectionKey.OP_READ; + } - /** - * Reads and returns a {@link DatagramPacket} from the underlying channel. - * @return the datagram packet read having the sender address. - */ - @Override - public DatagramPacket read(SelectionKey key) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1024); - SocketAddress sender = ((DatagramChannel)key.channel()).receive(buffer); - - /* - * It is required to create a DatagramPacket because we need to preserve which - * socket address acts as destination for sending reply packets. - */ - buffer.flip(); - DatagramPacket packet = new DatagramPacket(buffer); - packet.setSender(sender); - - return packet; - } - - /** - * @return the underlying datagram channel. - */ - @Override - public DatagramChannel getChannel() { - return (DatagramChannel) super.getChannel(); - } - - /** - * Binds UDP socket on the provided port. - * - * @throws IOException if any I/O error occurs. - */ - @Override - public void bind() throws IOException { - getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); - getChannel().configureBlocking(false); - System.out.println("Bound UDP socket at port: " + port); - } + /** + * Reads and returns a {@link DatagramPacket} from the underlying channel. + * + * @return the datagram packet read having the sender address. + */ + @Override + public DatagramPacket read(SelectionKey key) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(1024); + SocketAddress sender = ((DatagramChannel) key.channel()).receive(buffer); - /** - * Writes the pending {@link DatagramPacket} to the underlying channel sending data to - * the intended receiver of the packet. - */ - @Override - protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { - DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; - getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); - } + /* + * It is required to create a DatagramPacket because we need to preserve which socket address + * acts as destination for sending reply packets. + */ + buffer.flip(); + DatagramPacket packet = new DatagramPacket(buffer); + packet.setSender(sender); - /** - * Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the - * datagram packet must be set in the data using {@link DatagramPacket#setReceiver(SocketAddress)}. - */ - @Override - public void write(Object data, SelectionKey key) { - super.write(data, key); - } - - /** - * Container of data used for {@link NioDatagramChannel} to communicate with remote peer. - */ - public static class DatagramPacket { - private SocketAddress sender; - private ByteBuffer data; - private SocketAddress receiver; + return packet; + } - /** - * Creates a container with underlying data. - * - * @param data the underlying message to be written on channel. - */ - public DatagramPacket(ByteBuffer data) { - this.data = data; - } + /** + * @return the underlying datagram channel. + */ + @Override + public DatagramChannel getChannel() { + return (DatagramChannel) super.getChannel(); + } - /** - * @return the sender address. - */ - public SocketAddress getSender() { - return sender; - } - - /** - * Sets the sender address of this packet. - * @param sender the sender address. - */ - public void setSender(SocketAddress sender) { - this.sender = sender; - } + /** + * Binds UDP socket on the provided port. + * + * @throws IOException if any I/O error occurs. + */ + @Override + public void bind() throws IOException { + getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + getChannel().configureBlocking(false); + System.out.println("Bound UDP socket at port: " + port); + } - /** - * @return the receiver address. - */ - public SocketAddress getReceiver() { - return receiver; - } - - /** - * Sets the intended receiver address. This must be set when writing to the channel. - * @param receiver the receiver address. - */ - public void setReceiver(SocketAddress receiver) { - this.receiver = receiver; - } + /** + * Writes the pending {@link DatagramPacket} to the underlying channel sending data to the + * intended receiver of the packet. + */ + @Override + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; + getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); + } - /** - * @return the underlying message that will be written on channel. - */ - public ByteBuffer getData() { - return data; - } - } -} \ No newline at end of file + /** + * Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the + * datagram packet must be set in the data using + * {@link DatagramPacket#setReceiver(SocketAddress)}. + */ + @Override + public void write(Object data, SelectionKey key) { + super.write(data, key); + } + + /** + * Container of data used for {@link NioDatagramChannel} to communicate with remote peer. + */ + public static class DatagramPacket { + private SocketAddress sender; + private ByteBuffer data; + private SocketAddress receiver; + + /** + * Creates a container with underlying data. + * + * @param data the underlying message to be written on channel. + */ + public DatagramPacket(ByteBuffer data) { + this.data = data; + } + + /** + * @return the sender address. + */ + public SocketAddress getSender() { + return sender; + } + + /** + * Sets the sender address of this packet. + * + * @param sender the sender address. + */ + public void setSender(SocketAddress sender) { + this.sender = sender; + } + + /** + * @return the receiver address. + */ + public SocketAddress getReceiver() { + return receiver; + } + + /** + * Sets the intended receiver address. This must be set when writing to the channel. + * + * @param receiver the receiver address. + */ + public void setReceiver(SocketAddress receiver) { + this.receiver = receiver; + } + + /** + * @return the underlying message that will be written on channel. + */ + public ByteBuffer getData() { + return data; + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java index 273898ae3..89af20630 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java @@ -12,228 +12,225 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; + /** * This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern. * Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks - * for events from all these handles. Whenever an event occurs on any of the registered handles, - * it synchronously de-multiplexes the event which can be any of read, write or accept, and - * dispatches the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}. + * for events from all these handles. Whenever an event occurs on any of the registered handles, it + * synchronously de-multiplexes the event which can be any of read, write or accept, and dispatches + * the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}. * *

- * Implementation: - * A NIO reactor runs in its own thread when it is started using {@link #start()} method. - * {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing. + * Implementation: A NIO reactor runs in its own thread when it is started using {@link #start()} + * method. {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing. * *

- * NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible edge cases - * which are required in a real application. This implementation is meant to demonstrate the fundamental - * concepts that lie behind Reactor pattern. + * NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible + * edge cases which are required in a real application. This implementation is meant to demonstrate + * the fundamental concepts that lie behind Reactor pattern. * * @author npathai * */ public class NioReactor { - private Selector selector; - private Dispatcher dispatcher; - /** - * All the work of altering the SelectionKey operations and Selector operations are performed in - * the context of main event loop of reactor. So when any channel needs to change its readability - * or writability, a new command is added in the command queue and then the event loop picks up - * the command and executes it in next iteration. - */ - private Queue pendingCommands = new ConcurrentLinkedQueue<>(); - private ExecutorService reactorMain = Executors.newSingleThreadExecutor(); - - /** - * Creates a reactor which will use provided {@code dispatcher} to dispatch events. - * The application can provide various implementations of dispatcher which suits its - * needs. - * - * @param dispatcher a non-null dispatcher used to dispatch events on registered channels. - * @throws IOException if any I/O error occurs. - */ - public NioReactor(Dispatcher dispatcher) throws IOException { - this.dispatcher = dispatcher; - this.selector = Selector.open(); - } + private final Selector selector; + private final Dispatcher dispatcher; + /** + * All the work of altering the SelectionKey operations and Selector operations are performed in + * the context of main event loop of reactor. So when any channel needs to change its readability + * or writability, a new command is added in the command queue and then the event loop picks up + * the command and executes it in next iteration. + */ + private final Queue pendingCommands = new ConcurrentLinkedQueue<>(); + private final ExecutorService reactorMain = Executors.newSingleThreadExecutor(); - /** - * Starts the reactor event loop in a new thread. - * - * @throws IOException if any I/O error occurs. - */ - public void start() throws IOException { - reactorMain.execute(() -> { - try { - System.out.println("Reactor started, waiting for events..."); - eventLoop(); - } catch (IOException e) { - e.printStackTrace(); - } - }); - } - - /** - * Stops the reactor and related resources such as dispatcher. - */ - public void stop() { - reactorMain.shutdownNow(); - selector.wakeup(); - try { - reactorMain.awaitTermination(4, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - dispatcher.stop(); - } + /** + * Creates a reactor which will use provided {@code dispatcher} to dispatch events. The + * application can provide various implementations of dispatcher which suits its needs. + * + * @param dispatcher a non-null dispatcher used to dispatch events on registered channels. + * @throws IOException if any I/O error occurs. + */ + public NioReactor(Dispatcher dispatcher) throws IOException { + this.dispatcher = dispatcher; + this.selector = Selector.open(); + } - /** - * Registers a new channel (handle) with this reactor. Reactor will start waiting for events on this channel - * and notify of any events. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()} - * to know about the interested operation of this channel. - * - * @param channel a new channel on which reactor will wait for events. The channel must be bound - * prior to being registered. - * @return this - * @throws IOException if any I/O error occurs. - */ - public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { - SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); - key.attach(channel); - channel.setReactor(this); - return this; - } - - private void eventLoop() throws IOException { - while (true) { - - // honor interrupt request - if (Thread.interrupted()) { - break; - } - - // honor any pending commands first - processPendingCommands(); - - /* - * Synchronous event de-multiplexing happens here, this is blocking call which - * returns when it is possible to initiate non-blocking operation on any of the - * registered channels. - */ - selector.select(); - - /* - * Represents the events that have occurred on registered handles. - */ - Set keys = selector.selectedKeys(); + /** + * Starts the reactor event loop in a new thread. + * + * @throws IOException if any I/O error occurs. + */ + public void start() throws IOException { + reactorMain.execute(() -> { + try { + System.out.println("Reactor started, waiting for events..."); + eventLoop(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } - Iterator iterator = keys.iterator(); - - while (iterator.hasNext()) { - SelectionKey key = iterator.next(); - if (!key.isValid()) { - iterator.remove(); - continue; - } - processKey(key); - } - keys.clear(); - } - } - - private void processPendingCommands() { - Iterator iterator = pendingCommands.iterator(); - while (iterator.hasNext()) { - Runnable command = iterator.next(); - command.run(); - iterator.remove(); - } - } + /** + * Stops the reactor and related resources such as dispatcher. + * + * @throws InterruptedException if interrupted while stopping the reactor. + */ + public void stop() throws InterruptedException { + reactorMain.shutdownNow(); + selector.wakeup(); + reactorMain.awaitTermination(4, TimeUnit.SECONDS); + dispatcher.stop(); + } - /* - * Initiation dispatcher logic, it checks the type of event and notifier application - * specific event handler to handle the event. - */ - private void processKey(SelectionKey key) throws IOException { - if (key.isAcceptable()) { - onChannelAcceptable(key); - } else if (key.isReadable()) { - onChannelReadable(key); - } else if (key.isWritable()) { - onChannelWritable(key); - } - } + /** + * Registers a new channel (handle) with this reactor. Reactor will start waiting for events on + * this channel and notify of any events. While registering the channel the reactor uses + * {@link AbstractNioChannel#getInterestedOps()} to know about the interested operation of this + * channel. + * + * @param channel a new channel on which reactor will wait for events. The channel must be bound + * prior to being registered. + * @return this + * @throws IOException if any I/O error occurs. + */ + public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { + SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); + key.attach(channel); + channel.setReactor(this); + return this; + } - private void onChannelWritable(SelectionKey key) throws IOException { - AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); - channel.flush(key); - } + private void eventLoop() throws IOException { + while (true) { - private void onChannelReadable(SelectionKey key) { - try { - // reads the incoming data in context of reactor main loop. Can this be improved? - Object readObject = ((AbstractNioChannel)key.attachment()).read(key); + // honor interrupt request + if (Thread.interrupted()) { + break; + } - dispatchReadEvent(key, readObject); - } catch (IOException e) { - try { - key.channel().close(); - } catch (IOException e1) { - e1.printStackTrace(); - } - } - } + // honor any pending commands first + processPendingCommands(); - /* - * Uses the application provided dispatcher to dispatch events to application handler. - */ - private void dispatchReadEvent(SelectionKey key, Object readObject) { - dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key); - } + /* + * Synchronous event de-multiplexing happens here, this is blocking call which returns when it + * is possible to initiate non-blocking operation on any of the registered channels. + */ + selector.select(); - private void onChannelAcceptable(SelectionKey key) throws IOException { - ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); - SocketChannel socketChannel = serverSocketChannel.accept(); - socketChannel.configureBlocking(false); - SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); - readKey.attach(key.attachment()); - } + /* + * Represents the events that have occurred on registered handles. + */ + Set keys = selector.selectedKeys(); - /** - * Queues the change of operations request of a channel, which will change the interested - * operations of the channel sometime in future. - *

- * This is a non-blocking method and does not guarantee that the operations have changed when - * this method returns. - * - * @param key the key for which operations have to be changed. - * @param interestedOps the new interest operations. - */ - public void changeOps(SelectionKey key, int interestedOps) { - pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps)); - selector.wakeup(); - } - - /** - * A command that changes the interested operations of the key provided. - */ - class ChangeKeyOpsCommand implements Runnable { - private SelectionKey key; - private int interestedOps; - - public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) { - this.key = key; - this.interestedOps = interestedOps; - } - - public void run() { - key.interestOps(interestedOps); - } - - @Override - public String toString() { - return "Change of ops to: " + interestedOps; - } - } -} \ No newline at end of file + Iterator iterator = keys.iterator(); + + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + if (!key.isValid()) { + iterator.remove(); + continue; + } + processKey(key); + } + keys.clear(); + } + } + + private void processPendingCommands() { + Iterator iterator = pendingCommands.iterator(); + while (iterator.hasNext()) { + Runnable command = iterator.next(); + command.run(); + iterator.remove(); + } + } + + /* + * Initiation dispatcher logic, it checks the type of event and notifier application specific + * event handler to handle the event. + */ + private void processKey(SelectionKey key) throws IOException { + if (key.isAcceptable()) { + onChannelAcceptable(key); + } else if (key.isReadable()) { + onChannelReadable(key); + } else if (key.isWritable()) { + onChannelWritable(key); + } + } + + private void onChannelWritable(SelectionKey key) throws IOException { + AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); + channel.flush(key); + } + + private void onChannelReadable(SelectionKey key) { + try { + // reads the incoming data in context of reactor main loop. Can this be improved? + Object readObject = ((AbstractNioChannel) key.attachment()).read(key); + + dispatchReadEvent(key, readObject); + } catch (IOException e) { + try { + key.channel().close(); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } + + /* + * Uses the application provided dispatcher to dispatch events to application handler. + */ + private void dispatchReadEvent(SelectionKey key, Object readObject) { + dispatcher.onChannelReadEvent((AbstractNioChannel) key.attachment(), readObject, key); + } + + private void onChannelAcceptable(SelectionKey key) throws IOException { + ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); + SocketChannel socketChannel = serverSocketChannel.accept(); + socketChannel.configureBlocking(false); + SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); + readKey.attach(key.attachment()); + } + + /** + * Queues the change of operations request of a channel, which will change the interested + * operations of the channel sometime in future. + *

+ * This is a non-blocking method and does not guarantee that the operations have changed when this + * method returns. + * + * @param key the key for which operations have to be changed. + * @param interestedOps the new interest operations. + */ + public void changeOps(SelectionKey key, int interestedOps) { + pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps)); + selector.wakeup(); + } + + /** + * A command that changes the interested operations of the key provided. + */ + class ChangeKeyOpsCommand implements Runnable { + private SelectionKey key; + private int interestedOps; + + public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) { + this.key = key; + this.interestedOps = interestedOps; + } + + public void run() { + key.interestOps(interestedOps); + } + + @Override + public String toString() { + return "Change of ops to: " + interestedOps; + } + } +} diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java index ae54af643..17f47a394 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java @@ -9,81 +9,82 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; /** - * A wrapper over {@link NioServerSocketChannel} which can read and write data on a {@link SocketChannel}. + * A wrapper over {@link NioServerSocketChannel} which can read and write data on a + * {@link SocketChannel}. * * @author npathai */ public class NioServerSocketChannel extends AbstractNioChannel { - private int port; + private final int port; - /** - * Creates a {@link ServerSocketChannel} which will bind at provided port and use - * handler to handle incoming events on this channel. - *

- * Note the constructor does not bind the socket, {@link #bind()} method should be called for binding - * the socket. - * - * @param port the port on which channel will be bound to accept incoming connection requests. - * @param handler the handler that will handle incoming requests on this channel. - * @throws IOException if any I/O error occurs. - */ - public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { - super(handler, ServerSocketChannel.open()); - this.port = port; - } + /** + * Creates a {@link ServerSocketChannel} which will bind at provided port and use + * handler to handle incoming events on this channel. + *

+ * Note the constructor does not bind the socket, {@link #bind()} method should be called for + * binding the socket. + * + * @param port the port on which channel will be bound to accept incoming connection requests. + * @param handler the handler that will handle incoming requests on this channel. + * @throws IOException if any I/O error occurs. + */ + public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { + super(handler, ServerSocketChannel.open()); + this.port = port; + } - - @Override - public int getInterestedOps() { - // being a server socket channel it is interested in accepting connection from remote peers. - return SelectionKey.OP_ACCEPT; - } - /** - * @return the underlying {@link ServerSocketChannel}. - */ - @Override - public ServerSocketChannel getChannel() { - return (ServerSocketChannel) super.getChannel(); - } - - /** - * Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by - * the key. Due to the fact that there is a dedicated channel for each client connection - * we don't need to store the sender. - */ - @Override - public ByteBuffer read(SelectionKey key) throws IOException { - SocketChannel socketChannel = (SocketChannel) key.channel(); - ByteBuffer buffer = ByteBuffer.allocate(1024); - int read = socketChannel.read(buffer); - buffer.flip(); - if (read == -1) { - throw new IOException("Socket closed"); - } - return buffer; - } + @Override + public int getInterestedOps() { + // being a server socket channel it is interested in accepting connection from remote peers. + return SelectionKey.OP_ACCEPT; + } - /** - * Binds TCP socket on the provided port. - * - * @throws IOException if any I/O error occurs. - */ - @Override - public void bind() throws IOException { - ((ServerSocketChannel)getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); - ((ServerSocketChannel)getChannel()).configureBlocking(false); - System.out.println("Bound TCP socket at port: " + port); - } + /** + * @return the underlying {@link ServerSocketChannel}. + */ + @Override + public ServerSocketChannel getChannel() { + return (ServerSocketChannel) super.getChannel(); + } - /** - * Writes the pending {@link ByteBuffer} to the underlying channel sending data to - * the intended receiver of the packet. - */ - @Override - protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { - ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; - ((SocketChannel)key.channel()).write(pendingBuffer); - } + /** + * Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by + * the key. Due to the fact that there is a dedicated channel for each client + * connection we don't need to store the sender. + */ + @Override + public ByteBuffer read(SelectionKey key) throws IOException { + SocketChannel socketChannel = (SocketChannel) key.channel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int read = socketChannel.read(buffer); + buffer.flip(); + if (read == -1) { + throw new IOException("Socket closed"); + } + return buffer; + } + + /** + * Binds TCP socket on the provided port. + * + * @throws IOException if any I/O error occurs. + */ + @Override + public void bind() throws IOException { + ((ServerSocketChannel) getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + ((ServerSocketChannel) getChannel()).configureBlocking(false); + System.out.println("Bound TCP socket at port: " + port); + } + + /** + * Writes the pending {@link ByteBuffer} to the underlying channel sending data to the intended + * receiver of the packet. + */ + @Override + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; + ((SocketChannel) key.channel()).write(pendingBuffer); + } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java index b5392ac8f..baacda9f3 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java @@ -4,8 +4,8 @@ import java.nio.channels.SelectionKey; /** * Dispatches the events in the context of caller thread. This implementation is a good fit for - * small applications where there are limited clients. Using this implementation limits the scalability - * because the I/O thread performs the application specific processing. + * small applications where there are limited clients. Using this implementation limits the + * scalability because the I/O thread performs the application specific processing. * *

* For better performance use {@link ThreadPoolDispatcher}. @@ -16,28 +16,25 @@ import java.nio.channels.SelectionKey; */ public class SameThreadDispatcher implements Dispatcher { - /** - * Dispatches the read event in the context of caller thread. - *
- * Note this is a blocking call. It returns only after the associated handler has handled the - * read event. - */ - @Override - public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { - if (channel.getHandler() != null) { - /* - * Calls the associated handler to notify the read event where application specific code - * resides. - */ - channel.getHandler().handleChannelRead(channel, readObject, key); - } - } + /** + * Dispatches the read event in the context of caller thread.
+ * Note this is a blocking call. It returns only after the associated handler has handled the read + * event. + */ + @Override + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { + /* + * Calls the associated handler to notify the read event where application specific code + * resides. + */ + channel.getHandler().handleChannelRead(channel, readObject, key); + } - /** - * No resources to free. - */ - @Override - public void stop() { - // no-op - } + /** + * No resources to free. + */ + @Override + public void stop() { + // no-op + } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java index 8624b878e..9fd539adb 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java @@ -6,50 +6,45 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** - * An implementation that uses a pool of worker threads to dispatch the events. This provides - * better scalability as the application specific processing is not performed in the context - * of I/O (reactor) thread. + * An implementation that uses a pool of worker threads to dispatch the events. This provides better + * scalability as the application specific processing is not performed in the context of I/O + * (reactor) thread. * * @author npathai * */ -public class ThreadPoolDispatcher extends SameThreadDispatcher { +public class ThreadPoolDispatcher implements Dispatcher { - private ExecutorService executorService; + private final ExecutorService executorService; - /** - * Creates a pooled dispatcher with tunable pool size. - * - * @param poolSize number of pooled threads - */ - public ThreadPoolDispatcher(int poolSize) { - this.executorService = Executors.newFixedThreadPool(poolSize); - } + /** + * Creates a pooled dispatcher with tunable pool size. + * + * @param poolSize number of pooled threads + */ + public ThreadPoolDispatcher(int poolSize) { + this.executorService = Executors.newFixedThreadPool(poolSize); + } - /** - * Submits the work of dispatching the read event to worker pool, where it gets picked - * up by worker threads. - *
- * Note that this is a non-blocking call and returns immediately. It is not guaranteed - * that the event has been handled by associated handler. - */ - @Override - public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { - executorService.execute(() -> - ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key)); - } - - /** - * Stops the pool of workers. - */ - @Override - public void stop() { - executorService.shutdownNow(); - try { - executorService.awaitTermination(4, TimeUnit.SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + /** + * Submits the work of dispatching the read event to worker pool, where it gets picked up by + * worker threads.
+ * Note that this is a non-blocking call and returns immediately. It is not guaranteed that the + * event has been handled by associated handler. + */ + @Override + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { + executorService.execute(() -> channel.getHandler().handleChannelRead(channel, readObject, key)); + } + /** + * Stops the pool of workers. + * + * @throws InterruptedException if interrupted while stopping pool of workers. + */ + @Override + public void stop() throws InterruptedException { + executorService.shutdown(); + executorService.awaitTermination(4, TimeUnit.SECONDS); + } } diff --git a/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java index 9447aac01..bc51e26de 100644 --- a/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java +++ b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java @@ -4,24 +4,38 @@ import java.io.IOException; import org.junit.Test; +/** + * + * This class tests the Distributed Logging service by starting a Reactor and then sending it + * concurrent logging requests using multiple clients. + * + * @author npathai + */ public class AppTest { - @Test - public void testApp() throws IOException { - App app = new App(); - app.start(); - - AppClient client = new AppClient(); - client.start(); - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - client.stop(); - - app.stop(); - } + /** + * Test the application. + * + * @throws IOException if any I/O error occurs. + * @throws InterruptedException if interrupted while stopping the application. + */ + @Test + public void testApp() throws IOException, InterruptedException { + App app = new App(); + app.start(); + + AppClient client = new AppClient(); + client.start(); + + // allow clients to send requests. Artificial delay. + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + client.stop(); + + app.stop(); + } }