Work on #74, updated javadocs, reformatted code to google style guide, added missing final modifiers

This commit is contained in:
Narendra Pathai 2015-09-12 17:46:24 +05:30
parent aebd69efb4
commit 8d429525dc
12 changed files with 908 additions and 875 deletions

View File

@ -10,20 +10,18 @@ import com.iluwatar.reactor.framework.NioServerSocketChannel;
import com.iluwatar.reactor.framework.ThreadPoolDispatcher; import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
/** /**
* This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging Service * This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging
* where it listens on multiple TCP or UDP sockets for incoming log requests. * Service where it listens on multiple TCP or UDP sockets for incoming log requests.
* *
* <p> * <p>
* <i>INTENT</i> * <i>INTENT</i> <br/>
* <br/> * The Reactor design pattern handles service requests that are delivered concurrently to an
* The Reactor design pattern handles service requests that are delivered concurrently to an
* application by one or more clients. The application can register specific handlers for processing * application by one or more clients. The application can register specific handlers for processing
* which are called by reactor on specific events. * which are called by reactor on specific events.
* *
* <p> * <p>
* <i>PROBLEM</i> * <i>PROBLEM</i> <br/>
* <br/> * Server applications in a distributed system must handle multiple clients that send them service
* Server applications in a distributed system must handle multiple clients that send them service
* requests. Following forces need to be resolved: * requests. Following forces need to be resolved:
* <ul> * <ul>
* <li>Availability</li> * <li>Availability</li>
@ -33,8 +31,28 @@ import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
* </ul> * </ul>
* *
* <p> * <p>
* The application utilizes single thread to listen for requests on all ports. It does not create * <i>PARTICIPANTS</i> <br/>
* a separate thread for each client, which provides better scalability under load (number of clients * <ul>
* <li>Synchronous Event De-multiplexer</li> {@link NioReactor} plays the role of synchronous event
* de-multiplexer. It waits for events on multiple channels registered to it in an event loop.
*
* <p>
* <li>Initiation Dispatcher</li> {@link NioReactor} plays this role as the application specific
* {@link ChannelHandler}s are registered to the reactor.
*
* <p>
* <li>Handle</li> {@link AbstractNioChannel} acts as a handle that is registered to the reactor.
* When any events occur on a handle, reactor calls the appropriate handler.
*
* <p>
* <li>Event Handler</li> {@link ChannelHandler} acts as an event handler, which is bound to a
* channel and is called back when any event occurs on any of its associated handles. Application
* logic resides in event handlers.
* </ul>
*
* <p>
* The application utilizes single thread to listen for requests on all ports. It does not create a
* separate thread for each client, which provides better scalability under load (number of clients
* increase). * increase).
* *
* <p> * <p>
@ -45,59 +63,60 @@ import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
*/ */
public class App { public class App {
private NioReactor reactor; private NioReactor reactor;
/** /**
* App entry. * App entry.
* @throws IOException *
*/ * @throws IOException
public static void main(String[] args) throws IOException { */
new App().start(); public static void main(String[] args) throws IOException {
} new App().start();
}
/**
* Starts the NIO reactor.
* @throws IOException if any channel fails to bind.
*/
public void start() throws IOException {
/*
* The application can customize its event dispatching mechanism.
*/
reactor = new NioReactor(new ThreadPoolDispatcher(2));
/*
* This represents application specific business logic that dispatcher will call
* on appropriate events. These events are read events in our example.
*/
LoggingHandler loggingHandler = new LoggingHandler();
/*
* Our application binds to multiple channels and uses same logging handler to handle
* incoming log requests.
*/
reactor
.registerChannel(tcpChannel(6666, loggingHandler))
.registerChannel(tcpChannel(6667, loggingHandler))
.registerChannel(udpChannel(6668, loggingHandler))
.start();
}
/**
* Stops the NIO reactor. This is a blocking call.
*/
public void stop() {
reactor.stop();
}
private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException { /**
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler); * Starts the NIO reactor.
channel.bind(); *
return channel; * @throws IOException if any channel fails to bind.
} */
public void start() throws IOException {
private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException { /*
NioDatagramChannel channel = new NioDatagramChannel(port, handler); * The application can customize its event dispatching mechanism.
channel.bind(); */
return channel; reactor = new NioReactor(new ThreadPoolDispatcher(2));
}
/*
* This represents application specific business logic that dispatcher will call on appropriate
* events. These events are read events in our example.
*/
LoggingHandler loggingHandler = new LoggingHandler();
/*
* Our application binds to multiple channels and uses same logging handler to handle incoming
* log requests.
*/
reactor.registerChannel(tcpChannel(6666, loggingHandler)).registerChannel(tcpChannel(6667, loggingHandler))
.registerChannel(udpChannel(6668, loggingHandler)).start();
}
/**
* Stops the NIO reactor. This is a blocking call.
*
* @throws InterruptedException if interrupted while stopping the reactor.
*/
public void stop() throws InterruptedException {
reactor.stop();
}
private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
channel.bind();
return channel;
}
private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException {
NioDatagramChannel channel = new NioDatagramChannel(port, handler);
channel.bind();
return channel;
}
} }

View File

@ -17,148 +17,149 @@ import java.util.concurrent.TimeUnit;
/** /**
* Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging * Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging
* requests to Reactor. * requests to Reactor.
* *
* @author npathai * @author npathai
*/ */
public class AppClient { public class AppClient {
private ExecutorService service = Executors.newFixedThreadPool(4); private final ExecutorService service = Executors.newFixedThreadPool(4);
/** /**
* App client entry. * App client entry.
* @throws IOException if any I/O error occurs. *
*/ * @throws IOException if any I/O error occurs.
public static void main(String[] args) throws IOException { */
AppClient appClient = new AppClient(); public static void main(String[] args) throws IOException {
appClient.start(); AppClient appClient = new AppClient();
} appClient.start();
}
/** /**
* Starts the logging clients. * Starts the logging clients.
* @throws IOException if any I/O error occurs. *
*/ * @throws IOException if any I/O error occurs.
public void start() throws IOException { */
service.execute(new TCPLoggingClient("Client 1", 6666)); public void start() throws IOException {
service.execute(new TCPLoggingClient("Client 2", 6667)); service.execute(new TCPLoggingClient("Client 1", 6666));
service.execute(new UDPLoggingClient("Client 3", 6668)); service.execute(new TCPLoggingClient("Client 2", 6667));
service.execute(new UDPLoggingClient("Client 4", 6668)); service.execute(new UDPLoggingClient("Client 3", 6668));
} service.execute(new UDPLoggingClient("Client 4", 6668));
}
/** /**
* Stops logging clients. This is a blocking call. * Stops logging clients. This is a blocking call.
*/ */
public void stop() { public void stop() {
service.shutdown(); service.shutdown();
if (!service.isTerminated()) { if (!service.isTerminated()) {
service.shutdownNow(); service.shutdownNow();
try { try {
service.awaitTermination(1000, TimeUnit.SECONDS); service.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
} }
private static void artificialDelayOf(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** private static void artificialDelayOf(long millis) {
* A logging client that sends requests to Reactor on TCP socket. try {
*/ Thread.sleep(millis);
static class TCPLoggingClient implements Runnable { } catch (InterruptedException e) {
e.printStackTrace();
}
}
private int serverPort; /**
private String clientName; * A logging client that sends requests to Reactor on TCP socket.
*/
static class TCPLoggingClient implements Runnable {
/** private final int serverPort;
* Creates a new TCP logging client. private final String clientName;
*
* @param clientName the name of the client to be sent in logging requests.
* @param port the port on which client will send logging requests.
*/
public TCPLoggingClient(String clientName, int serverPort) {
this.clientName = clientName;
this.serverPort = serverPort;
}
public void run() { /**
try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) { * Creates a new TCP logging client.
OutputStream outputStream = socket.getOutputStream(); *
PrintWriter writer = new PrintWriter(outputStream); * @param clientName the name of the client to be sent in logging requests.
sendLogRequests(writer, socket.getInputStream()); * @param port the port on which client will send logging requests.
} catch (IOException e) { */
e.printStackTrace(); public TCPLoggingClient(String clientName, int serverPort) {
throw new RuntimeException(e); this.clientName = clientName;
} this.serverPort = serverPort;
} }
private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException { public void run() {
for (int i = 0; i < 4; i++) { try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) {
writer.println(clientName + " - Log request: " + i); OutputStream outputStream = socket.getOutputStream();
writer.flush(); PrintWriter writer = new PrintWriter(outputStream);
sendLogRequests(writer, socket.getInputStream());
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
byte[] data = new byte[1024]; private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException {
int read = inputStream.read(data, 0, data.length); for (int i = 0; i < 4; i++) {
if (read == 0) { writer.println(clientName + " - Log request: " + i);
System.out.println("Read zero bytes"); writer.flush();
} else {
System.out.println(new String(data, 0, read));
}
artificialDelayOf(100); byte[] data = new byte[1024];
} int read = inputStream.read(data, 0, data.length);
} if (read == 0) {
System.out.println("Read zero bytes");
} else {
System.out.println(new String(data, 0, read));
}
} artificialDelayOf(100);
}
}
/** }
* A logging client that sends requests to Reactor on UDP socket.
*/
static class UDPLoggingClient implements Runnable {
private String clientName;
private InetSocketAddress remoteAddress;
/** /**
* Creates a new UDP logging client. * A logging client that sends requests to Reactor on UDP socket.
* */
* @param clientName the name of the client to be sent in logging requests. static class UDPLoggingClient implements Runnable {
* @param port the port on which client will send logging requests. private final String clientName;
* @throws UnknownHostException if localhost is unknown private final InetSocketAddress remoteAddress;
*/
public UDPLoggingClient(String clientName, int port) throws UnknownHostException {
this.clientName = clientName;
this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
}
@Override /**
public void run() { * Creates a new UDP logging client.
try (DatagramSocket socket = new DatagramSocket()) { *
for (int i = 0; i < 4; i++) { * @param clientName the name of the client to be sent in logging requests.
* @param port the port on which client will send logging requests.
String message = clientName + " - Log request: " + i; * @throws UnknownHostException if localhost is unknown
DatagramPacket request = new DatagramPacket(message.getBytes(), */
message.getBytes().length, remoteAddress); public UDPLoggingClient(String clientName, int port) throws UnknownHostException {
this.clientName = clientName;
socket.send(request); this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
}
byte[] data = new byte[1024]; @Override
DatagramPacket reply = new DatagramPacket(data, data.length); public void run() {
socket.receive(reply); try (DatagramSocket socket = new DatagramSocket()) {
if (reply.getLength() == 0) { for (int i = 0; i < 4; i++) {
System.out.println("Read zero bytes");
} else { String message = clientName + " - Log request: " + i;
System.out.println(new String(reply.getData(), 0, reply.getLength())); DatagramPacket request = new DatagramPacket(message.getBytes(), message.getBytes().length, remoteAddress);
}
socket.send(request);
artificialDelayOf(100);
} byte[] data = new byte[1024];
} catch (IOException e1) { DatagramPacket reply = new DatagramPacket(data, data.length);
e1.printStackTrace(); socket.receive(reply);
} if (reply.getLength() == 0) {
} System.out.println("Read zero bytes");
} } else {
System.out.println(new String(reply.getData(), 0, reply.getLength()));
}
artificialDelayOf(100);
}
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
} }

View File

@ -8,53 +8,54 @@ import com.iluwatar.reactor.framework.ChannelHandler;
import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket; import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket;
/** /**
* Logging server application logic. It logs the incoming requests on standard console and returns * Logging server application logic. It logs the incoming requests on standard console and returns a
* a canned acknowledgement back to the remote peer. * canned acknowledgement back to the remote peer.
* *
* @author npathai * @author npathai
*/ */
public class LoggingHandler implements ChannelHandler { public class LoggingHandler implements ChannelHandler {
private static final byte[] ACK = "Data logged successfully".getBytes(); private static final byte[] ACK = "Data logged successfully".getBytes();
/** /**
* Decodes the received data and logs it on standard console. * Decodes the received data and logs it on standard console.
*/ */
@Override @Override
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
/* /*
* As this handler is attached with both TCP and UDP channels we need to check whether * As this handler is attached with both TCP and UDP channels we need to check whether the data
* the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel). * received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel).
*/ */
if (readObject instanceof ByteBuffer) { if (readObject instanceof ByteBuffer) {
doLogging(((ByteBuffer)readObject)); doLogging(((ByteBuffer) readObject));
sendReply(channel, key); sendReply(channel, key);
} else if (readObject instanceof DatagramPacket) { } else if (readObject instanceof DatagramPacket) {
DatagramPacket datagram = (DatagramPacket)readObject; DatagramPacket datagram = (DatagramPacket) readObject;
doLogging(datagram.getData()); doLogging(datagram.getData());
sendReply(channel, datagram, key); sendReply(channel, datagram, key);
} else { } else {
throw new IllegalStateException("Unknown data received"); throw new IllegalStateException("Unknown data received");
} }
} }
private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) { private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) {
/* /*
* Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming message. * Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming
*/ * message.
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK)); */
replyPacket.setReceiver(incomingPacket.getSender()); DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK));
replyPacket.setReceiver(incomingPacket.getSender());
channel.write(replyPacket, key);
}
private void sendReply(AbstractNioChannel channel, SelectionKey key) { channel.write(replyPacket, key);
ByteBuffer buffer = ByteBuffer.wrap(ACK); }
channel.write(buffer, key);
}
private void doLogging(ByteBuffer data) { private void sendReply(AbstractNioChannel channel, SelectionKey key) {
// assuming UTF-8 :( ByteBuffer buffer = ByteBuffer.wrap(ACK);
System.out.println(new String(data.array(), 0, data.limit())); channel.write(buffer, key);
} }
private void doLogging(ByteBuffer data) {
// assuming UTF-8 :(
System.out.println(new String(data.array(), 0, data.limit()));
}
} }

View File

@ -10,143 +10,145 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
/** /**
* This represents the <i>Handle</i> of Reactor pattern. These are resources managed by OS * This represents the <i>Handle</i> of Reactor pattern. These are resources managed by OS which can
* which can be submitted to {@link NioReactor}. * be submitted to {@link NioReactor}.
* *
* <p> * <p>
* This class serves has the responsibility of reading the data when a read event occurs and * This class serves has the responsibility of reading the data when a read event occurs and writing
* writing the data back when the channel is writable. It leaves the reading and writing of * the data back when the channel is writable. It leaves the reading and writing of data on the
* data on the concrete implementation. It provides a block writing mechanism wherein when * concrete implementation. It provides a block writing mechanism wherein when any
* any {@link ChannelHandler} wants to write data back, it queues the data in pending write queue * {@link ChannelHandler} wants to write data back, it queues the data in pending write queue and
* and clears it in block manner. This provides better throughput. * clears it in block manner. This provides better throughput.
* *
* @author npathai * @author npathai
* *
*/ */
public abstract class AbstractNioChannel { public abstract class AbstractNioChannel {
private SelectableChannel channel;
private ChannelHandler handler;
private Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private NioReactor reactor;
/**
* Creates a new channel.
* @param handler which will handle events occurring on this channel.
* @param channel a NIO channel to be wrapped.
*/
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
this.handler = handler;
this.channel = channel;
}
/**
* Injects the reactor in this channel.
*/
void setReactor(NioReactor reactor) {
this.reactor = reactor;
}
/** private final SelectableChannel channel;
* @return the wrapped NIO channel. private final ChannelHandler handler;
*/ private final Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
public SelectableChannel getChannel() { private NioReactor reactor;
return channel;
}
/** /**
* The operation in which the channel is interested, this operation is provided to {@link Selector}. * Creates a new channel.
* *
* @return interested operation. * @param handler which will handle events occurring on this channel.
* @see SelectionKey * @param channel a NIO channel to be wrapped.
*/ */
public abstract int getInterestedOps(); public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
this.handler = handler;
/** this.channel = channel;
* Binds the channel on provided port. }
*
* @throws IOException if any I/O error occurs.
*/
public abstract void bind() throws IOException;
/**
* Reads the data using the key and returns the read data. The underlying channel should be fetched using
* {@link SelectionKey#channel()}.
*
* @param key the key on which read event occurred.
* @return data read.
* @throws IOException if any I/O error occurs.
*/
public abstract Object read(SelectionKey key) throws IOException;
/** /**
* @return the handler associated with this channel. * Injects the reactor in this channel.
*/ */
public ChannelHandler getHandler() { void setReactor(NioReactor reactor) {
return handler; this.reactor = reactor;
} }
/* /**
* Called from the context of reactor thread when the key becomes writable. * @return the wrapped NIO channel.
* The channel writes the whole pending block of data at once. */
*/ public SelectableChannel getChannel() {
void flush(SelectionKey key) throws IOException { return channel;
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel()); }
while (true) {
Object pendingWrite = pendingWrites.poll();
if (pendingWrite == null) {
// We don't have anything more to write so channel is interested in reading more data
reactor.changeOps(key, SelectionKey.OP_READ);
break;
}
// ask the concrete channel to make sense of data and write it to java channel
doWrite(pendingWrite, key);
}
}
/** /**
* Writes the data to the channel. * The operation in which the channel is interested, this operation is provided to
* * {@link Selector}.
* @param pendingWrite the data to be written on channel. *
* @param key the key which is writable. * @return interested operation.
* @throws IOException if any I/O error occurs. * @see SelectionKey
*/ */
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException; public abstract int getInterestedOps();
/** /**
* Queues the data for writing. The data is not guaranteed to be written on underlying channel * Binds the channel on provided port.
* when this method returns. It will be written when the channel is flushed. *
* * @throws IOException if any I/O error occurs.
* <p> */
* This method is used by the {@link ChannelHandler} to send reply back to the client. public abstract void bind() throws IOException;
* <br/>
* Example: /**
* <pre> * Reads the data using the key and returns the read data. The underlying channel should be
* <code> * fetched using {@link SelectionKey#channel()}.
* {@literal @}Override *
* public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { * @param key the key on which read event occurred.
* byte[] data = ((ByteBuffer)readObject).array(); * @return data read.
* ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes()); * @throws IOException if any I/O error occurs.
* channel.write(buffer, key); */
* } public abstract Object read(SelectionKey key) throws IOException;
* </code>
* /**
* @param data the data to be written on underlying channel. * @return the handler associated with this channel.
* @param key the key which is writable. */
*/ public ChannelHandler getHandler() {
public void write(Object data, SelectionKey key) { return handler;
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel()); }
if (pendingWrites == null) {
synchronized (this.channelToPendingWrites) { /*
pendingWrites = this.channelToPendingWrites.get(key.channel()); * Called from the context of reactor thread when the key becomes writable. The channel writes the
if (pendingWrites == null) { * whole pending block of data at once.
pendingWrites = new ConcurrentLinkedQueue<>(); */
this.channelToPendingWrites.put(key.channel(), pendingWrites); void flush(SelectionKey key) throws IOException {
} Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
} while (true) {
} Object pendingWrite = pendingWrites.poll();
pendingWrites.add(data); if (pendingWrite == null) {
reactor.changeOps(key, SelectionKey.OP_WRITE); // We don't have anything more to write so channel is interested in reading more data
} reactor.changeOps(key, SelectionKey.OP_READ);
break;
}
// ask the concrete channel to make sense of data and write it to java channel
doWrite(pendingWrite, key);
}
}
/**
* Writes the data to the channel.
*
* @param pendingWrite the data to be written on channel.
* @param key the key which is writable.
* @throws IOException if any I/O error occurs.
*/
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
/**
* Queues the data for writing. The data is not guaranteed to be written on underlying channel
* when this method returns. It will be written when the channel is flushed.
*
* <p>
* This method is used by the {@link ChannelHandler} to send reply back to the client. <br/>
* Example:
*
* <pre>
* <code>
* {@literal @}Override
* public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
* byte[] data = ((ByteBuffer)readObject).array();
* ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
* channel.write(buffer, key);
* }
* </code>
*
* @param data the data to be written on underlying channel.
* @param key the key which is writable.
*/
public void write(Object data, SelectionKey key) {
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
synchronized (this.channelToPendingWrites) {
pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
pendingWrites = new ConcurrentLinkedQueue<>();
this.channelToPendingWrites.put(key.channel(), pendingWrites);
}
}
}
pendingWrites.add(data);
reactor.changeOps(key, SelectionKey.OP_WRITE);
}
} }

View File

@ -7,19 +7,19 @@ import java.nio.channels.SelectionKey;
* to it by the {@link Dispatcher}. This is where the application logic resides. * to it by the {@link Dispatcher}. This is where the application logic resides.
* *
* <p> * <p>
* A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and whenever * A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and
* an event occurs on any of the associated channels, the handler is notified of the event. * whenever an event occurs on any of the associated channels, the handler is notified of the event.
* *
* @author npathai * @author npathai
*/ */
public interface ChannelHandler { public interface ChannelHandler {
/** /**
* Called when the {@code channel} receives some data from remote peer. * Called when the {@code channel} receives some data from remote peer.
* *
* @param channel the channel from which the data was received. * @param channel the channel from which the data was received.
* @param readObject the data read. * @param readObject the data read.
* @param key the key on which read event occurred. * @param key the key on which read event occurred.
*/ */
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
} }

View File

@ -3,39 +3,41 @@ package com.iluwatar.reactor.framework;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
/** /**
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the * Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write * registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write or
* or connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the I/O * connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the
* processing from application specific processing. * I/O processing from application specific processing. <br/>
* <br/> * Dispatcher should call the {@link ChannelHandler} associated with the channel on which event
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred. * occurred.
* *
* <p> * <p>
* The application can customize the way in which event is dispatched such as using the reactor thread to * The application can customize the way in which event is dispatched such as using the reactor
* dispatch event to channels or use a worker pool to do the non I/O processing. * thread to dispatch event to channels or use a worker pool to do the non I/O processing.
* *
* @see SameThreadDispatcher * @see SameThreadDispatcher
* @see ThreadPoolDispatcher * @see ThreadPoolDispatcher
* *
* @author npathai * @author npathai
*/ */
public interface Dispatcher { public interface Dispatcher {
/** /**
* This hook method is called when read event occurs on particular channel. The data read * This hook method is called when read event occurs on particular channel. The data read is
* is provided in <code>readObject</code>. The implementation should dispatch this read event * provided in <code>readObject</code>. The implementation should dispatch this read event to the
* to the associated {@link ChannelHandler} of <code>channel</code>. * associated {@link ChannelHandler} of <code>channel</code>.
* *
* <p> * <p>
* The type of <code>readObject</code> depends on the channel on which data was received. * The type of <code>readObject</code> depends on the channel on which data was received.
* *
* @param channel on which read event occurred * @param channel on which read event occurred
* @param readObject object read by channel * @param readObject object read by channel
* @param key on which event occurred * @param key on which event occurred
*/ */
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
/** /**
* Stops dispatching events and cleans up any acquired resources such as threads. * Stops dispatching events and cleans up any acquired resources such as threads.
*/ *
void stop(); * @throws InterruptedException if interrupted while stopping dispatcher.
*/
void stop() throws InterruptedException;
} }

View File

@ -15,143 +15,147 @@ import java.nio.channels.SelectionKey;
*/ */
public class NioDatagramChannel extends AbstractNioChannel { public class NioDatagramChannel extends AbstractNioChannel {
private int port; private final int port;
/** /**
* Creates a {@link DatagramChannel} which will bind at provided port and use <code>handler</code> to handle * Creates a {@link DatagramChannel} which will bind at provided port and use <code>handler</code>
* incoming events on this channel. * to handle incoming events on this channel.
* <p> * <p>
* Note the constructor does not bind the socket, {@link #bind()} method should be called for binding * Note the constructor does not bind the socket, {@link #bind()} method should be called for
* the socket. * binding the socket.
* *
* @param port the port to be bound to listen for incoming datagram requests. * @param port the port to be bound to listen for incoming datagram requests.
* @param handler the handler to be used for handling incoming requests on this channel. * @param handler the handler to be used for handling incoming requests on this channel.
* @throws IOException if any I/O error occurs. * @throws IOException if any I/O error occurs.
*/ */
public NioDatagramChannel(int port, ChannelHandler handler) throws IOException { public NioDatagramChannel(int port, ChannelHandler handler) throws IOException {
super(handler, DatagramChannel.open()); super(handler, DatagramChannel.open());
this.port = port; this.port = port;
} }
@Override @Override
public int getInterestedOps() { public int getInterestedOps() {
/* there is no need to accept connections in UDP, so the channel shows interest in /*
* reading data. * there is no need to accept connections in UDP, so the channel shows interest in reading data.
*/ */
return SelectionKey.OP_READ; return SelectionKey.OP_READ;
} }
/** /**
* Reads and returns a {@link DatagramPacket} from the underlying channel. * Reads and returns a {@link DatagramPacket} from the underlying channel.
* @return the datagram packet read having the sender address. *
*/ * @return the datagram packet read having the sender address.
@Override */
public DatagramPacket read(SelectionKey key) throws IOException { @Override
ByteBuffer buffer = ByteBuffer.allocate(1024); public DatagramPacket read(SelectionKey key) throws IOException {
SocketAddress sender = ((DatagramChannel)key.channel()).receive(buffer); ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketAddress sender = ((DatagramChannel) key.channel()).receive(buffer);
/*
* It is required to create a DatagramPacket because we need to preserve which
* socket address acts as destination for sending reply packets.
*/
buffer.flip();
DatagramPacket packet = new DatagramPacket(buffer);
packet.setSender(sender);
return packet;
}
/**
* @return the underlying datagram channel.
*/
@Override
public DatagramChannel getChannel() {
return (DatagramChannel) super.getChannel();
}
/**
* Binds UDP socket on the provided <code>port</code>.
*
* @throws IOException if any I/O error occurs.
*/
@Override
public void bind() throws IOException {
getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
getChannel().configureBlocking(false);
System.out.println("Bound UDP socket at port: " + port);
}
/** /*
* Writes the pending {@link DatagramPacket} to the underlying channel sending data to * It is required to create a DatagramPacket because we need to preserve which socket address
* the intended receiver of the packet. * acts as destination for sending reply packets.
*/ */
@Override buffer.flip();
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { DatagramPacket packet = new DatagramPacket(buffer);
DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; packet.setSender(sender);
getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
}
/** return packet;
* Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the }
* datagram packet must be set in the <code>data</code> using {@link DatagramPacket#setReceiver(SocketAddress)}.
*/
@Override
public void write(Object data, SelectionKey key) {
super.write(data, key);
}
/**
* Container of data used for {@link NioDatagramChannel} to communicate with remote peer.
*/
public static class DatagramPacket {
private SocketAddress sender;
private ByteBuffer data;
private SocketAddress receiver;
/** /**
* Creates a container with underlying data. * @return the underlying datagram channel.
* */
* @param data the underlying message to be written on channel. @Override
*/ public DatagramChannel getChannel() {
public DatagramPacket(ByteBuffer data) { return (DatagramChannel) super.getChannel();
this.data = data; }
}
/** /**
* @return the sender address. * Binds UDP socket on the provided <code>port</code>.
*/ *
public SocketAddress getSender() { * @throws IOException if any I/O error occurs.
return sender; */
} @Override
public void bind() throws IOException {
/** getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
* Sets the sender address of this packet. getChannel().configureBlocking(false);
* @param sender the sender address. System.out.println("Bound UDP socket at port: " + port);
*/ }
public void setSender(SocketAddress sender) {
this.sender = sender;
}
/** /**
* @return the receiver address. * Writes the pending {@link DatagramPacket} to the underlying channel sending data to the
*/ * intended receiver of the packet.
public SocketAddress getReceiver() { */
return receiver; @Override
} protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
DatagramPacket pendingPacket = (DatagramPacket) pendingWrite;
/** getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
* Sets the intended receiver address. This must be set when writing to the channel. }
* @param receiver the receiver address.
*/
public void setReceiver(SocketAddress receiver) {
this.receiver = receiver;
}
/** /**
* @return the underlying message that will be written on channel. * Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
*/ * datagram packet must be set in the <code>data</code> using
public ByteBuffer getData() { * {@link DatagramPacket#setReceiver(SocketAddress)}.
return data; */
} @Override
} public void write(Object data, SelectionKey key) {
} super.write(data, key);
}
/**
* Container of data used for {@link NioDatagramChannel} to communicate with remote peer.
*/
public static class DatagramPacket {
private SocketAddress sender;
private ByteBuffer data;
private SocketAddress receiver;
/**
* Creates a container with underlying data.
*
* @param data the underlying message to be written on channel.
*/
public DatagramPacket(ByteBuffer data) {
this.data = data;
}
/**
* @return the sender address.
*/
public SocketAddress getSender() {
return sender;
}
/**
* Sets the sender address of this packet.
*
* @param sender the sender address.
*/
public void setSender(SocketAddress sender) {
this.sender = sender;
}
/**
* @return the receiver address.
*/
public SocketAddress getReceiver() {
return receiver;
}
/**
* Sets the intended receiver address. This must be set when writing to the channel.
*
* @param receiver the receiver address.
*/
public void setReceiver(SocketAddress receiver) {
this.receiver = receiver;
}
/**
* @return the underlying message that will be written on channel.
*/
public ByteBuffer getData() {
return data;
}
}
}

View File

@ -12,228 +12,225 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern. * This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern.
* Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks * Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks
* for events from all these handles. Whenever an event occurs on any of the registered handles, * for events from all these handles. Whenever an event occurs on any of the registered handles, it
* it synchronously de-multiplexes the event which can be any of read, write or accept, and * synchronously de-multiplexes the event which can be any of read, write or accept, and dispatches
* dispatches the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}. * the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}.
* *
* <p> * <p>
* Implementation: * Implementation: A NIO reactor runs in its own thread when it is started using {@link #start()}
* A NIO reactor runs in its own thread when it is started using {@link #start()} method. * method. {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing.
* {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing.
* *
* <p> * <p>
* NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible edge cases * NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible
* which are required in a real application. This implementation is meant to demonstrate the fundamental * edge cases which are required in a real application. This implementation is meant to demonstrate
* concepts that lie behind Reactor pattern. * the fundamental concepts that lie behind Reactor pattern.
* *
* @author npathai * @author npathai
* *
*/ */
public class NioReactor { public class NioReactor {
private Selector selector; private final Selector selector;
private Dispatcher dispatcher; private final Dispatcher dispatcher;
/** /**
* All the work of altering the SelectionKey operations and Selector operations are performed in * All the work of altering the SelectionKey operations and Selector operations are performed in
* the context of main event loop of reactor. So when any channel needs to change its readability * the context of main event loop of reactor. So when any channel needs to change its readability
* or writability, a new command is added in the command queue and then the event loop picks up * or writability, a new command is added in the command queue and then the event loop picks up
* the command and executes it in next iteration. * the command and executes it in next iteration.
*/ */
private Queue<Runnable> pendingCommands = new ConcurrentLinkedQueue<>(); private final Queue<Runnable> pendingCommands = new ConcurrentLinkedQueue<>();
private ExecutorService reactorMain = Executors.newSingleThreadExecutor(); private final ExecutorService reactorMain = Executors.newSingleThreadExecutor();
/**
* Creates a reactor which will use provided {@code dispatcher} to dispatch events.
* The application can provide various implementations of dispatcher which suits its
* needs.
*
* @param dispatcher a non-null dispatcher used to dispatch events on registered channels.
* @throws IOException if any I/O error occurs.
*/
public NioReactor(Dispatcher dispatcher) throws IOException {
this.dispatcher = dispatcher;
this.selector = Selector.open();
}
/** /**
* Starts the reactor event loop in a new thread. * Creates a reactor which will use provided {@code dispatcher} to dispatch events. The
* * application can provide various implementations of dispatcher which suits its needs.
* @throws IOException if any I/O error occurs. *
*/ * @param dispatcher a non-null dispatcher used to dispatch events on registered channels.
public void start() throws IOException { * @throws IOException if any I/O error occurs.
reactorMain.execute(() -> { */
try { public NioReactor(Dispatcher dispatcher) throws IOException {
System.out.println("Reactor started, waiting for events..."); this.dispatcher = dispatcher;
eventLoop(); this.selector = Selector.open();
} catch (IOException e) { }
e.printStackTrace();
}
});
}
/**
* Stops the reactor and related resources such as dispatcher.
*/
public void stop() {
reactorMain.shutdownNow();
selector.wakeup();
try {
reactorMain.awaitTermination(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
dispatcher.stop();
}
/** /**
* Registers a new channel (handle) with this reactor. Reactor will start waiting for events on this channel * Starts the reactor event loop in a new thread.
* and notify of any events. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()} *
* to know about the interested operation of this channel. * @throws IOException if any I/O error occurs.
* */
* @param channel a new channel on which reactor will wait for events. The channel must be bound public void start() throws IOException {
* prior to being registered. reactorMain.execute(() -> {
* @return this try {
* @throws IOException if any I/O error occurs. System.out.println("Reactor started, waiting for events...");
*/ eventLoop();
public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { } catch (IOException e) {
SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); e.printStackTrace();
key.attach(channel); }
channel.setReactor(this); });
return this; }
}
private void eventLoop() throws IOException {
while (true) {
// honor interrupt request
if (Thread.interrupted()) {
break;
}
// honor any pending commands first
processPendingCommands();
/*
* Synchronous event de-multiplexing happens here, this is blocking call which
* returns when it is possible to initiate non-blocking operation on any of the
* registered channels.
*/
selector.select();
/*
* Represents the events that have occurred on registered handles.
*/
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator(); /**
* Stops the reactor and related resources such as dispatcher.
while (iterator.hasNext()) { *
SelectionKey key = iterator.next(); * @throws InterruptedException if interrupted while stopping the reactor.
if (!key.isValid()) { */
iterator.remove(); public void stop() throws InterruptedException {
continue; reactorMain.shutdownNow();
} selector.wakeup();
processKey(key); reactorMain.awaitTermination(4, TimeUnit.SECONDS);
} dispatcher.stop();
keys.clear(); }
}
}
private void processPendingCommands() {
Iterator<Runnable> iterator = pendingCommands.iterator();
while (iterator.hasNext()) {
Runnable command = iterator.next();
command.run();
iterator.remove();
}
}
/* /**
* Initiation dispatcher logic, it checks the type of event and notifier application * Registers a new channel (handle) with this reactor. Reactor will start waiting for events on
* specific event handler to handle the event. * this channel and notify of any events. While registering the channel the reactor uses
*/ * {@link AbstractNioChannel#getInterestedOps()} to know about the interested operation of this
private void processKey(SelectionKey key) throws IOException { * channel.
if (key.isAcceptable()) { *
onChannelAcceptable(key); * @param channel a new channel on which reactor will wait for events. The channel must be bound
} else if (key.isReadable()) { * prior to being registered.
onChannelReadable(key); * @return this
} else if (key.isWritable()) { * @throws IOException if any I/O error occurs.
onChannelWritable(key); */
} public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
} SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps());
key.attach(channel);
channel.setReactor(this);
return this;
}
private void onChannelWritable(SelectionKey key) throws IOException { private void eventLoop() throws IOException {
AbstractNioChannel channel = (AbstractNioChannel) key.attachment(); while (true) {
channel.flush(key);
}
private void onChannelReadable(SelectionKey key) { // honor interrupt request
try { if (Thread.interrupted()) {
// reads the incoming data in context of reactor main loop. Can this be improved? break;
Object readObject = ((AbstractNioChannel)key.attachment()).read(key); }
dispatchReadEvent(key, readObject); // honor any pending commands first
} catch (IOException e) { processPendingCommands();
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
/* /*
* Uses the application provided dispatcher to dispatch events to application handler. * Synchronous event de-multiplexing happens here, this is blocking call which returns when it
*/ * is possible to initiate non-blocking operation on any of the registered channels.
private void dispatchReadEvent(SelectionKey key, Object readObject) { */
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key); selector.select();
}
private void onChannelAcceptable(SelectionKey key) throws IOException { /*
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); * Represents the events that have occurred on registered handles.
SocketChannel socketChannel = serverSocketChannel.accept(); */
socketChannel.configureBlocking(false); Set<SelectionKey> keys = selector.selectedKeys();
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
readKey.attach(key.attachment());
}
/** Iterator<SelectionKey> iterator = keys.iterator();
* Queues the change of operations request of a channel, which will change the interested
* operations of the channel sometime in future. while (iterator.hasNext()) {
* <p> SelectionKey key = iterator.next();
* This is a non-blocking method and does not guarantee that the operations have changed when if (!key.isValid()) {
* this method returns. iterator.remove();
* continue;
* @param key the key for which operations have to be changed. }
* @param interestedOps the new interest operations. processKey(key);
*/ }
public void changeOps(SelectionKey key, int interestedOps) { keys.clear();
pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps)); }
selector.wakeup(); }
}
private void processPendingCommands() {
/** Iterator<Runnable> iterator = pendingCommands.iterator();
* A command that changes the interested operations of the key provided. while (iterator.hasNext()) {
*/ Runnable command = iterator.next();
class ChangeKeyOpsCommand implements Runnable { command.run();
private SelectionKey key; iterator.remove();
private int interestedOps; }
}
public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) {
this.key = key; /*
this.interestedOps = interestedOps; * Initiation dispatcher logic, it checks the type of event and notifier application specific
} * event handler to handle the event.
*/
public void run() { private void processKey(SelectionKey key) throws IOException {
key.interestOps(interestedOps); if (key.isAcceptable()) {
} onChannelAcceptable(key);
} else if (key.isReadable()) {
@Override onChannelReadable(key);
public String toString() { } else if (key.isWritable()) {
return "Change of ops to: " + interestedOps; onChannelWritable(key);
} }
} }
}
private void onChannelWritable(SelectionKey key) throws IOException {
AbstractNioChannel channel = (AbstractNioChannel) key.attachment();
channel.flush(key);
}
private void onChannelReadable(SelectionKey key) {
try {
// reads the incoming data in context of reactor main loop. Can this be improved?
Object readObject = ((AbstractNioChannel) key.attachment()).read(key);
dispatchReadEvent(key, readObject);
} catch (IOException e) {
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
/*
* Uses the application provided dispatcher to dispatch events to application handler.
*/
private void dispatchReadEvent(SelectionKey key, Object readObject) {
dispatcher.onChannelReadEvent((AbstractNioChannel) key.attachment(), readObject, key);
}
private void onChannelAcceptable(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
readKey.attach(key.attachment());
}
/**
* Queues the change of operations request of a channel, which will change the interested
* operations of the channel sometime in future.
* <p>
* This is a non-blocking method and does not guarantee that the operations have changed when this
* method returns.
*
* @param key the key for which operations have to be changed.
* @param interestedOps the new interest operations.
*/
public void changeOps(SelectionKey key, int interestedOps) {
pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps));
selector.wakeup();
}
/**
* A command that changes the interested operations of the key provided.
*/
class ChangeKeyOpsCommand implements Runnable {
private SelectionKey key;
private int interestedOps;
public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) {
this.key = key;
this.interestedOps = interestedOps;
}
public void run() {
key.interestOps(interestedOps);
}
@Override
public String toString() {
return "Change of ops to: " + interestedOps;
}
}
}

View File

@ -9,81 +9,82 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
/** /**
* A wrapper over {@link NioServerSocketChannel} which can read and write data on a {@link SocketChannel}. * A wrapper over {@link NioServerSocketChannel} which can read and write data on a
* {@link SocketChannel}.
* *
* @author npathai * @author npathai
*/ */
public class NioServerSocketChannel extends AbstractNioChannel { public class NioServerSocketChannel extends AbstractNioChannel {
private int port; private final int port;
/** /**
* Creates a {@link ServerSocketChannel} which will bind at provided port and use * Creates a {@link ServerSocketChannel} which will bind at provided port and use
* <code>handler</code> to handle incoming events on this channel. * <code>handler</code> to handle incoming events on this channel.
* <p> * <p>
* Note the constructor does not bind the socket, {@link #bind()} method should be called for binding * Note the constructor does not bind the socket, {@link #bind()} method should be called for
* the socket. * binding the socket.
* *
* @param port the port on which channel will be bound to accept incoming connection requests. * @param port the port on which channel will be bound to accept incoming connection requests.
* @param handler the handler that will handle incoming requests on this channel. * @param handler the handler that will handle incoming requests on this channel.
* @throws IOException if any I/O error occurs. * @throws IOException if any I/O error occurs.
*/ */
public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException { public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException {
super(handler, ServerSocketChannel.open()); super(handler, ServerSocketChannel.open());
this.port = port; this.port = port;
} }
@Override
public int getInterestedOps() {
// being a server socket channel it is interested in accepting connection from remote peers.
return SelectionKey.OP_ACCEPT;
}
/** @Override
* @return the underlying {@link ServerSocketChannel}. public int getInterestedOps() {
*/ // being a server socket channel it is interested in accepting connection from remote peers.
@Override return SelectionKey.OP_ACCEPT;
public ServerSocketChannel getChannel() { }
return (ServerSocketChannel) super.getChannel();
}
/**
* Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by
* the <code>key</code>. Due to the fact that there is a dedicated channel for each client connection
* we don't need to store the sender.
*/
@Override
public ByteBuffer read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
buffer.flip();
if (read == -1) {
throw new IOException("Socket closed");
}
return buffer;
}
/** /**
* Binds TCP socket on the provided <code>port</code>. * @return the underlying {@link ServerSocketChannel}.
* */
* @throws IOException if any I/O error occurs. @Override
*/ public ServerSocketChannel getChannel() {
@Override return (ServerSocketChannel) super.getChannel();
public void bind() throws IOException { }
((ServerSocketChannel)getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
((ServerSocketChannel)getChannel()).configureBlocking(false);
System.out.println("Bound TCP socket at port: " + port);
}
/** /**
* Writes the pending {@link ByteBuffer} to the underlying channel sending data to * Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by
* the intended receiver of the packet. * the <code>key</code>. Due to the fact that there is a dedicated channel for each client
*/ * connection we don't need to store the sender.
@Override */
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { @Override
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; public ByteBuffer read(SelectionKey key) throws IOException {
((SocketChannel)key.channel()).write(pendingBuffer); SocketChannel socketChannel = (SocketChannel) key.channel();
} ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(buffer);
buffer.flip();
if (read == -1) {
throw new IOException("Socket closed");
}
return buffer;
}
/**
* Binds TCP socket on the provided <code>port</code>.
*
* @throws IOException if any I/O error occurs.
*/
@Override
public void bind() throws IOException {
((ServerSocketChannel) getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
((ServerSocketChannel) getChannel()).configureBlocking(false);
System.out.println("Bound TCP socket at port: " + port);
}
/**
* Writes the pending {@link ByteBuffer} to the underlying channel sending data to the intended
* receiver of the packet.
*/
@Override
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;
((SocketChannel) key.channel()).write(pendingBuffer);
}
} }

View File

@ -4,8 +4,8 @@ import java.nio.channels.SelectionKey;
/** /**
* Dispatches the events in the context of caller thread. This implementation is a good fit for * Dispatches the events in the context of caller thread. This implementation is a good fit for
* small applications where there are limited clients. Using this implementation limits the scalability * small applications where there are limited clients. Using this implementation limits the
* because the I/O thread performs the application specific processing. * scalability because the I/O thread performs the application specific processing.
* *
* <p> * <p>
* For better performance use {@link ThreadPoolDispatcher}. * For better performance use {@link ThreadPoolDispatcher}.
@ -16,28 +16,25 @@ import java.nio.channels.SelectionKey;
*/ */
public class SameThreadDispatcher implements Dispatcher { public class SameThreadDispatcher implements Dispatcher {
/** /**
* Dispatches the read event in the context of caller thread. * Dispatches the read event in the context of caller thread. <br/>
* <br/> * Note this is a blocking call. It returns only after the associated handler has handled the read
* Note this is a blocking call. It returns only after the associated handler has handled the * event.
* read event. */
*/ @Override
@Override public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { /*
if (channel.getHandler() != null) { * Calls the associated handler to notify the read event where application specific code
/* * resides.
* Calls the associated handler to notify the read event where application specific code */
* resides. channel.getHandler().handleChannelRead(channel, readObject, key);
*/ }
channel.getHandler().handleChannelRead(channel, readObject, key);
}
}
/** /**
* No resources to free. * No resources to free.
*/ */
@Override @Override
public void stop() { public void stop() {
// no-op // no-op
} }
} }

View File

@ -6,50 +6,45 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* An implementation that uses a pool of worker threads to dispatch the events. This provides * An implementation that uses a pool of worker threads to dispatch the events. This provides better
* better scalability as the application specific processing is not performed in the context * scalability as the application specific processing is not performed in the context of I/O
* of I/O (reactor) thread. * (reactor) thread.
* *
* @author npathai * @author npathai
* *
*/ */
public class ThreadPoolDispatcher extends SameThreadDispatcher { public class ThreadPoolDispatcher implements Dispatcher {
private ExecutorService executorService; private final ExecutorService executorService;
/** /**
* Creates a pooled dispatcher with tunable pool size. * Creates a pooled dispatcher with tunable pool size.
* *
* @param poolSize number of pooled threads * @param poolSize number of pooled threads
*/ */
public ThreadPoolDispatcher(int poolSize) { public ThreadPoolDispatcher(int poolSize) {
this.executorService = Executors.newFixedThreadPool(poolSize); this.executorService = Executors.newFixedThreadPool(poolSize);
} }
/** /**
* Submits the work of dispatching the read event to worker pool, where it gets picked * Submits the work of dispatching the read event to worker pool, where it gets picked up by
* up by worker threads. * worker threads. <br/>
* <br/> * Note that this is a non-blocking call and returns immediately. It is not guaranteed that the
* Note that this is a non-blocking call and returns immediately. It is not guaranteed * event has been handled by associated handler.
* that the event has been handled by associated handler. */
*/ @Override
@Override public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { executorService.execute(() -> channel.getHandler().handleChannelRead(channel, readObject, key));
executorService.execute(() -> }
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key));
}
/**
* Stops the pool of workers.
*/
@Override
public void stop() {
executorService.shutdownNow();
try {
executorService.awaitTermination(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Stops the pool of workers.
*
* @throws InterruptedException if interrupted while stopping pool of workers.
*/
@Override
public void stop() throws InterruptedException {
executorService.shutdown();
executorService.awaitTermination(4, TimeUnit.SECONDS);
}
} }

View File

@ -4,24 +4,38 @@ import java.io.IOException;
import org.junit.Test; import org.junit.Test;
/**
*
* This class tests the Distributed Logging service by starting a Reactor and then sending it
* concurrent logging requests using multiple clients.
*
* @author npathai
*/
public class AppTest { public class AppTest {
@Test /**
public void testApp() throws IOException { * Test the application.
App app = new App(); *
app.start(); * @throws IOException if any I/O error occurs.
* @throws InterruptedException if interrupted while stopping the application.
AppClient client = new AppClient(); */
client.start(); @Test
public void testApp() throws IOException, InterruptedException {
try { App app = new App();
Thread.sleep(2000); app.start();
} catch (InterruptedException e) {
e.printStackTrace(); AppClient client = new AppClient();
} client.start();
client.stop(); // allow clients to send requests. Artificial delay.
try {
app.stop(); Thread.sleep(2000);
} } catch (InterruptedException e) {
e.printStackTrace();
}
client.stop();
app.stop();
}
} }