diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/App.java b/reactor/src/main/java/com/iluwatar/reactor/app/App.java index d7b280465..947173494 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/App.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/App.java @@ -10,8 +10,8 @@ import com.iluwatar.reactor.framework.NioServerSocketChannel; import com.iluwatar.reactor.framework.ThreadPoolDispatcher; /** - * This application demonstrates Reactor pattern. It represents a Distributed Logging Service - * where it can listen on multiple TCP or UDP sockets for incoming log requests. + * This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging Service + * where it listens on multiple TCP or UDP sockets for incoming log requests. * *
* INTENT @@ -49,13 +49,10 @@ public class App { /** * App entry. + * @throws IOException */ - public static void main(String[] args) { - try { - new App().start(); - } catch (IOException e) { - e.printStackTrace(); - } + public static void main(String[] args) throws IOException { + new App().start(); } /** @@ -70,12 +67,12 @@ public class App { /* * This represents application specific business logic that dispatcher will call - * on appropriate events. These events are read and write event in our example. + * on appropriate events. These events are read events in our example. */ LoggingHandler loggingHandler = new LoggingHandler(); /* - * Our application binds to multiple I/O channels and uses same logging handler to handle + * Our application binds to multiple channels and uses same logging handler to handle * incoming log requests. */ reactor diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java index e5a7dd145..033711569 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/AppClient.java @@ -9,24 +9,43 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; -import java.net.SocketException; +import java.net.UnknownHostException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +/** + * Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging + * requests to Reactor. + * + * @author npathai + */ public class AppClient { - private ExecutorService service = Executors.newFixedThreadPool(3); - - public static void main(String[] args) { - new AppClient().start(); + private ExecutorService service = Executors.newFixedThreadPool(4); + + /** + * App client entry. + * @throws IOException if any I/O error occurs. + */ + public static void main(String[] args) throws IOException { + AppClient appClient = new AppClient(); + appClient.start(); } - public void start() { - service.execute(new LoggingClient("Client 1", 6666)); - service.execute(new LoggingClient("Client 2", 6667)); - service.execute(new UDPLoggingClient(6668)); + /** + * Starts the logging clients. + * @throws IOException if any I/O error occurs. + */ + public void start() throws IOException { + service.execute(new TCPLoggingClient("Client 1", 6666)); + service.execute(new TCPLoggingClient("Client 2", 6667)); + service.execute(new UDPLoggingClient("Client 3", 6668)); + service.execute(new UDPLoggingClient("Client 4", 6668)); } - + + /** + * Stops logging clients. This is a blocking call. + */ public void stop() { service.shutdown(); if (!service.isTerminated()) { @@ -39,49 +58,49 @@ public class AppClient { } } - /* - * A logging client that sends logging requests to logging server + private static void artificialDelayOf(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + /** + * A logging client that sends requests to Reactor on TCP socket. */ - static class LoggingClient implements Runnable { + static class TCPLoggingClient implements Runnable { private int serverPort; private String clientName; - public LoggingClient(String clientName, int serverPort) { + /** + * Creates a new TCP logging client. + * + * @param clientName the name of the client to be sent in logging requests. + * @param port the port on which client will send logging requests. + */ + public TCPLoggingClient(String clientName, int serverPort) { this.clientName = clientName; this.serverPort = serverPort; } public void run() { - Socket socket = null; - try { - socket = new Socket(InetAddress.getLocalHost(), serverPort); + try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) { OutputStream outputStream = socket.getOutputStream(); PrintWriter writer = new PrintWriter(outputStream); - writeLogs(writer, socket.getInputStream()); + sendLogRequests(writer, socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); - } finally { - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } } } - private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException { + private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException { for (int i = 0; i < 4; i++) { writer.println(clientName + " - Log request: " + i); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - e.printStackTrace(); - } writer.flush(); + byte[] data = new byte[1024]; int read = inputStream.read(data, 0, data.length); if (read == 0) { @@ -89,46 +108,56 @@ public class AppClient { } else { System.out.println(new String(data, 0, read)); } + + artificialDelayOf(100); } } - } - - static class UDPLoggingClient implements Runnable { - private int port; - public UDPLoggingClient(int port) { - this.port = port; + } + + /** + * A logging client that sends requests to Reactor on UDP socket. + */ + static class UDPLoggingClient implements Runnable { + private String clientName; + private InetSocketAddress remoteAddress; + + /** + * Creates a new UDP logging client. + * + * @param clientName the name of the client to be sent in logging requests. + * @param port the port on which client will send logging requests. + * @throws UnknownHostException if localhost is unknown + */ + public UDPLoggingClient(String clientName, int port) throws UnknownHostException { + this.clientName = clientName; + this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port); } - + @Override public void run() { - DatagramSocket socket = null; - try { - socket = new DatagramSocket(); + try (DatagramSocket socket = new DatagramSocket()) { for (int i = 0; i < 4; i++) { - String message = "UDP Client" + " - Log request: " + i; - try { - DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port)); - socket.send(packet); - - byte[] data = new byte[1024]; - DatagramPacket reply = new DatagramPacket(data, data.length); - socket.receive(reply); - if (reply.getLength() == 0) { - System.out.println("Read zero bytes"); - } else { - System.out.println(new String(reply.getData(), 0, reply.getLength())); - } - } catch (IOException e) { - e.printStackTrace(); + + String message = clientName + " - Log request: " + i; + DatagramPacket request = new DatagramPacket(message.getBytes(), + message.getBytes().length, remoteAddress); + + socket.send(request); + + byte[] data = new byte[1024]; + DatagramPacket reply = new DatagramPacket(data, data.length); + socket.receive(reply); + if (reply.getLength() == 0) { + System.out.println("Read zero bytes"); + } else { + System.out.println(new String(reply.getData(), 0, reply.getLength())); } + + artificialDelayOf(100); } - } catch (SocketException e1) { + } catch (IOException e1) { e1.printStackTrace(); - } finally { - if (socket != null) { - socket.close(); - } } } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java index 6fa95de2d..eed26b078 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/LoggingHandler.java @@ -9,7 +9,7 @@ import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket; /** * Logging server application logic. It logs the incoming requests on standard console and returns - * a canned acknowledgement back to the remote peer. + * a canned acknowledgement back to the remote peer. * * @author npathai */ @@ -23,17 +23,15 @@ public class LoggingHandler implements ChannelHandler { @Override public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { /* - * As this channel is attached to both TCP and UDP channels we need to check whether + * As this handler is attached with both TCP and UDP channels we need to check whether * the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel). */ if (readObject instanceof ByteBuffer) { - byte[] data = ((ByteBuffer)readObject).array(); - doLogging(data); - sendReply(channel, data, key); + doLogging(((ByteBuffer)readObject)); + sendReply(channel, key); } else if (readObject instanceof DatagramPacket) { DatagramPacket datagram = (DatagramPacket)readObject; - byte[] data = datagram.getData().array(); - doLogging(data); + doLogging(datagram.getData()); sendReply(channel, datagram, key); } else { throw new IllegalStateException("Unknown data received"); @@ -50,13 +48,13 @@ public class LoggingHandler implements ChannelHandler { channel.write(replyPacket, key); } - private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { + private void sendReply(AbstractNioChannel channel, SelectionKey key) { ByteBuffer buffer = ByteBuffer.wrap(ACK); channel.write(buffer, key); } - private void doLogging(byte[] data) { + private void doLogging(ByteBuffer data) { // assuming UTF-8 :( - System.out.println(new String(data)); + System.out.println(new String(data.array(), 0, data.limit())); } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java index a4b18179a..24862644d 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java @@ -55,7 +55,7 @@ public abstract class AbstractNioChannel { } /** - * The operation in which the channel is interested, this operation is be provided to {@link Selector}. + * The operation in which the channel is interested, this operation is provided to {@link Selector}. * * @return interested operation. * @see SelectionKey @@ -63,15 +63,17 @@ public abstract class AbstractNioChannel { public abstract int getInterestedOps(); /** - * Requests the channel to bind. + * Binds the channel on provided port. * * @throws IOException if any I/O error occurs. */ public abstract void bind() throws IOException; /** - * Reads the data using the key and returns the read data. - * @param key the key which is readable. + * Reads the data using the key and returns the read data. The underlying channel should be fetched using + * {@link SelectionKey#channel()}. + * + * @param key the key on which read event occurred. * @return data read. * @throws IOException if any I/O error occurs. */ @@ -106,7 +108,7 @@ public abstract class AbstractNioChannel { /** * Writes the data to the channel. * - * @param pendingWrite data which was queued for writing in batch mode. + * @param pendingWrite the data to be written on channel. * @param key the key which is writable. * @throws IOException if any I/O error occurs. */ diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java index e1df57020..0aae9db75 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ChannelHandler.java @@ -7,7 +7,7 @@ import java.nio.channels.SelectionKey; * to it by the {@link Dispatcher}. This is where the application logic resides. * *
- * A {@link ChannelHandler} is associated with one or many {@link AbstractNioChannel}s, and whenever
+ * A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and whenever
* an event occurs on any of the associated channels, the handler is notified of the event.
*
* @author npathai
@@ -15,11 +15,11 @@ import java.nio.channels.SelectionKey;
public interface ChannelHandler {
/**
- * Called when the {@code channel} has received some data from remote peer.
+ * Called when the {@code channel} receives some data from remote peer.
*
- * @param channel the channel from which the data is received.
+ * @param channel the channel from which the data was received.
* @param readObject the data read.
- * @param key the key from which the data is received.
+ * @param key the key on which read event occurred.
*/
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
}
diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java
index 120a11085..c563ef9d3 100644
--- a/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java
+++ b/reactor/src/main/java/com/iluwatar/reactor/framework/Dispatcher.java
@@ -5,7 +5,7 @@ import java.nio.channels.SelectionKey;
/**
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write
- * or connect, and then calls the {@link Dispatcher} to dispatch the event. This decouples the I/O
+ * or connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the I/O
* processing from application specific processing.
*
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred.
@@ -24,6 +24,9 @@ public interface Dispatcher {
* This hook method is called when read event occurs on particular channel. The data read
* is provided in readObject
. The implementation should dispatch this read event
* to the associated {@link ChannelHandler} of channel
.
+ *
+ *
+ * The type of readObject
depends on the channel on which data was received.
*
* @param channel on which read event occurred
* @param readObject object read by channel
@@ -32,7 +35,7 @@ public interface Dispatcher {
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
/**
- * Stops the dispatching events and cleans up any acquired resources such as threads.
+ * Stops dispatching events and cleans up any acquired resources such as threads.
*/
void stop();
}
diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java
index 2666f05b8..f338ce4a3 100644
--- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java
+++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java
@@ -48,12 +48,13 @@ public class NioDatagramChannel extends AbstractNioChannel {
@Override
public DatagramPacket read(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
- SocketAddress sender = getChannel().receive(buffer);
+ SocketAddress sender = ((DatagramChannel)key.channel()).receive(buffer);
/*
* It is required to create a DatagramPacket because we need to preserve which
* socket address acts as destination for sending reply packets.
*/
+ buffer.flip();
DatagramPacket packet = new DatagramPacket(buffer);
packet.setSender(sender);
@@ -91,7 +92,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
}
/**
- * Write the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
+ * Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
* datagram packet must be set in the data
using {@link DatagramPacket#setReceiver(SocketAddress)}.
*/
@Override
diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java
index b92f4a9ba..273898ae3 100644
--- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java
+++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java
@@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit;
*
* Implementation: * A NIO reactor runs in its own thread when it is started using {@link #start()} method. - * {@link NioReactor} uses {@link Selector} as a mechanism for achieving Synchronous Event De-multiplexing. + * {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing. * *
- * NOTE: This is one of the way to implement NIO reactor and it does not take care of all possible edge cases - * which may be required in a real application. This implementation is meant to demonstrate the fundamental + * NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible edge cases + * which are required in a real application. This implementation is meant to demonstrate the fundamental * concepts that lie behind Reactor pattern. * * @author npathai @@ -64,16 +64,13 @@ public class NioReactor { * @throws IOException if any I/O error occurs. */ public void start() throws IOException { - reactorMain.execute(new Runnable() { - @Override - public void run() { + reactorMain.execute(() -> { try { System.out.println("Reactor started, waiting for events..."); eventLoop(); } catch (IOException e) { e.printStackTrace(); } - } }); } @@ -92,11 +89,11 @@ public class NioReactor { } /** - * Registers a new channel (handle) with this reactor after which the reactor will wait for events - * on this channel. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()} + * Registers a new channel (handle) with this reactor. Reactor will start waiting for events on this channel + * and notify of any events. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()} * to know about the interested operation of this channel. * - * @param channel a new handle on which reactor will wait for events. The channel must be bound + * @param channel a new channel on which reactor will wait for events. The channel must be bound * prior to being registered. * @return this * @throws IOException if any I/O error occurs. @@ -111,7 +108,7 @@ public class NioReactor { private void eventLoop() throws IOException { while (true) { - // Honor interrupt request + // honor interrupt request if (Thread.interrupted()) { break; } @@ -189,7 +186,7 @@ public class NioReactor { } /* - * Uses the application provided dispatcher to dispatch events to respective handlers. + * Uses the application provided dispatcher to dispatch events to application handler. */ private void dispatchReadEvent(SelectionKey key, Object readObject) { dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key); @@ -207,10 +204,10 @@ public class NioReactor { * Queues the change of operations request of a channel, which will change the interested * operations of the channel sometime in future. *
- * This is a non-blocking method and does not guarantee that the operations are changed when + * This is a non-blocking method and does not guarantee that the operations have changed when * this method returns. * - * @param key the key for which operations are to be changed. + * @param key the key for which operations have to be changed. * @param interestedOps the new interest operations. */ public void changeOps(SelectionKey key, int interestedOps) { diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java index 92fa9234f..ae54af643 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java @@ -24,8 +24,8 @@ public class NioServerSocketChannel extends AbstractNioChannel { * Note the constructor does not bind the socket, {@link #bind()} method should be called for binding * the socket. * - * @param port the port to be bound to listen for incoming requests. - * @param handler the handler to be used for handling incoming requests on this channel. + * @param port the port on which channel will be bound to accept incoming connection requests. + * @param handler the handler that will handle incoming requests on this channel. * @throws IOException if any I/O error occurs. */ public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { @@ -36,7 +36,7 @@ public class NioServerSocketChannel extends AbstractNioChannel { @Override public int getInterestedOps() { - // being a server socket channel it is interested in accepting connection from remote clients. + // being a server socket channel it is interested in accepting connection from remote peers. return SelectionKey.OP_ACCEPT; } @@ -58,6 +58,7 @@ public class NioServerSocketChannel extends AbstractNioChannel { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int read = socketChannel.read(buffer); + buffer.flip(); if (read == -1) { throw new IOException("Socket closed"); } @@ -83,7 +84,6 @@ public class NioServerSocketChannel extends AbstractNioChannel { @Override protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; - System.out.println("Writing on channel"); ((SocketChannel)key.channel()).write(pendingBuffer); } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java index 2300d7c74..b5392ac8f 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/SameThreadDispatcher.java @@ -8,7 +8,7 @@ import java.nio.channels.SelectionKey; * because the I/O thread performs the application specific processing. * *
- * For real applications use {@link ThreadPoolDispatcher}. + * For better performance use {@link ThreadPoolDispatcher}. * * @see ThreadPoolDispatcher * diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java index b514d1824..8624b878e 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/ThreadPoolDispatcher.java @@ -7,8 +7,8 @@ import java.util.concurrent.TimeUnit; /** * An implementation that uses a pool of worker threads to dispatch the events. This provides - * for better scalability as the application specific processing is not performed in the context - * of I/O thread. + * better scalability as the application specific processing is not performed in the context + * of I/O (reactor) thread. * * @author npathai * @@ -46,7 +46,7 @@ public class ThreadPoolDispatcher extends SameThreadDispatcher { public void stop() { executorService.shutdownNow(); try { - executorService.awaitTermination(1000, TimeUnit.SECONDS); + executorService.awaitTermination(4, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); }