Work on #74, repackaged and added javadocs

This commit is contained in:
Narendra Pathai
2015-09-04 17:43:01 +05:30
parent e5ea9f5c0d
commit 7ac262b880
22 changed files with 925 additions and 482 deletions

View File

@ -1,74 +0,0 @@
package com.iluwatar.reactor;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public abstract class AbstractNioChannel {
private SelectableChannel channel;
private ChannelHandler handler;
private Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private NioReactor reactor;
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
this.handler = handler;
this.channel = channel;
}
public void setReactor(NioReactor reactor) {
this.reactor = reactor;
}
public SelectableChannel getChannel() {
return channel;
}
public abstract int getInterestedOps();
public abstract Object read(SelectionKey key) throws IOException;
public void setHandler(ChannelHandler handler) {
this.handler = handler;
}
public ChannelHandler getHandler() {
return handler;
}
// Called from the context of reactor thread
public void write(SelectionKey key) throws IOException {
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
while (true) {
Object pendingWrite = pendingWrites.poll();
if (pendingWrite == null) {
System.out.println("No more pending writes");
reactor.changeOps(key, SelectionKey.OP_READ);
break;
}
doWrite(pendingWrite, key);
}
}
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
public void write(Object data, SelectionKey key) {
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
synchronized (this.channelToPendingWrites) {
pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
pendingWrites = new ConcurrentLinkedQueue<>();
this.channelToPendingWrites.put(key.channel(), pendingWrites);
}
}
}
pendingWrites.add(data);
reactor.changeOps(key, SelectionKey.OP_WRITE);
}
}

View File

@ -1,45 +0,0 @@
package com.iluwatar.reactor;
import java.io.IOException;
public class App {
private NioReactor reactor;
public static void main(String[] args) {
try {
new App().start();
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() throws IOException {
reactor = new NioReactor(new ThreadPoolDispatcher(2));
LoggingHandler loggingHandler = new LoggingHandler();
reactor
.registerChannel(tcpChannel(6666, loggingHandler))
.registerChannel(tcpChannel(6667, loggingHandler))
.registerChannel(udpChannel(6668, loggingHandler))
.start();
}
public void stop() {
reactor.stop();
}
private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
channel.bind();
return channel;
}
private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException {
NioDatagramChannel channel = new NioDatagramChannel(port, handler);
channel.bind();
return channel;
}
}

View File

@ -1,8 +0,0 @@
package com.iluwatar.reactor;
import java.nio.channels.SelectionKey;
public interface ChannelHandler {
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
}

View File

@ -1,8 +0,0 @@
package com.iluwatar.reactor;
import java.nio.channels.SelectionKey;
public interface Dispatcher {
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
void stop();
}

View File

@ -1,39 +0,0 @@
package com.iluwatar.reactor;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import com.iluwatar.reactor.NioDatagramChannel.DatagramPacket;
public class LoggingHandler implements ChannelHandler {
@Override
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
if (readObject instanceof ByteBuffer) {
byte[] data = ((ByteBuffer)readObject).array();
doLogging(data);
sendReply(channel, data, key);
} else if (readObject instanceof DatagramPacket) {
DatagramPacket datagram = (DatagramPacket)readObject;
byte[] data = datagram.getData().array();
doLogging(data);
sendReply(channel, datagram, key);
}
}
private void sendReply(AbstractNioChannel channel, DatagramPacket datagram, SelectionKey key) {
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap("Data logged successfully".getBytes()));
replyPacket.setReceiver(datagram.getSender());
channel.write(replyPacket, key);
}
private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes());
channel.write(buffer, key);
}
private void doLogging(byte[] data) {
// assuming UTF-8 :(
System.out.println(new String(data));
}
}

View File

@ -1,80 +0,0 @@
package com.iluwatar.reactor;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
public class NioDatagramChannel extends AbstractNioChannel {
private int port;
public NioDatagramChannel(int port, ChannelHandler handler) throws IOException {
super(handler, DatagramChannel.open());
this.port = port;
}
@Override
public int getInterestedOps() {
return SelectionKey.OP_READ;
}
@Override
public Object read(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketAddress sender = getChannel().receive(buffer);
DatagramPacket packet = new DatagramPacket(buffer);
packet.setSender(sender);
return packet;
}
@Override
public DatagramChannel getChannel() {
return (DatagramChannel) super.getChannel();
}
public void bind() throws IOException {
getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
getChannel().configureBlocking(false);
System.out.println("Bound UDP socket at port: " + port);
}
@Override
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
DatagramPacket pendingPacket = (DatagramPacket) pendingWrite;
getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
}
static class DatagramPacket {
private SocketAddress sender;
private ByteBuffer data;
private SocketAddress receiver;
public DatagramPacket(ByteBuffer data) {
this.data = data;
}
public SocketAddress getSender() {
return sender;
}
public void setSender(SocketAddress sender) {
this.sender = sender;
}
public SocketAddress getReceiver() {
return receiver;
}
public void setReceiver(SocketAddress receiver) {
this.receiver = receiver;
}
public ByteBuffer getData() {
return data;
}
}
}

View File

@ -1,170 +0,0 @@
package com.iluwatar.reactor;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/*
* Abstractions
* ---------------
* 2 - Synchronous Event De-multiplexer
*/
public class NioReactor {
private Selector selector;
private Dispatcher dispatcher;
private Queue<Command> pendingChanges = new ConcurrentLinkedQueue<>();
private ExecutorService reactorService = Executors.newSingleThreadExecutor();
public NioReactor(Dispatcher dispatcher) throws IOException {
this.dispatcher = dispatcher;
this.selector = Selector.open();
}
public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps());
key.attach(channel);
channel.setReactor(this);
return this;
}
public void start() throws IOException {
reactorService.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("Reactor started, waiting for events...");
eventLoop();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void stop() {
reactorService.shutdownNow();
selector.wakeup();
try {
reactorService.awaitTermination(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
dispatcher.stop();
}
private void eventLoop() throws IOException {
while (true) {
if (Thread.interrupted()) {
break;
}
// honor any pending requests first
processPendingChanges();
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) {
iterator.remove();
continue;
}
processKey(key);
}
keys.clear();
}
}
private void processPendingChanges() {
Iterator<Command> iterator = pendingChanges.iterator();
while (iterator.hasNext()) {
Command command = iterator.next();
command.execute();
iterator.remove();
}
}
private void processKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
acceptConnection(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
private void write(SelectionKey key) throws IOException {
AbstractNioChannel channel = (AbstractNioChannel) key.attachment();
channel.write(key);
}
private void read(SelectionKey key) {
Object readObject;
try {
readObject = ((AbstractNioChannel)key.attachment()).read(key);
dispatchReadEvent(key, readObject);
} catch (IOException e) {
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
private void dispatchReadEvent(SelectionKey key, Object readObject) {
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key);
}
private void acceptConnection(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
readKey.attach(key.attachment());
}
interface Command {
void execute();
}
public void changeOps(SelectionKey key, int interestedOps) {
pendingChanges.add(new ChangeKeyOpsCommand(key, interestedOps));
selector.wakeup();
}
class ChangeKeyOpsCommand implements Command {
private SelectionKey key;
private int interestedOps;
public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) {
this.key = key;
this.interestedOps = interestedOps;
}
public void execute() {
key.interestOps(interestedOps);
}
@Override
public String toString() {
return "Change of ops to: " + interestedOps;
}
}
}

View File

@ -1,18 +0,0 @@
package com.iluwatar.reactor;
import java.nio.channels.SelectionKey;
public class SameThreadDispatcher implements Dispatcher {
@Override
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
if (channel.getHandler() != null) {
channel.getHandler().handleChannelRead(channel, readObject, key);
}
}
@Override
public void stop() {
// no-op
}
}

View File

@ -1,37 +0,0 @@
package com.iluwatar.reactor;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolDispatcher extends SameThreadDispatcher {
private ExecutorService executorService;
public ThreadPoolDispatcher(int poolSize) {
this.executorService = Executors.newFixedThreadPool(poolSize);
}
@Override
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
executorService.execute(new Runnable() {
@Override
public void run() {
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key);
}
});
}
@Override
public void stop() {
executorService.shutdownNow();
try {
executorService.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,106 @@
package com.iluwatar.reactor.app;
import java.io.IOException;
import com.iluwatar.reactor.framework.AbstractNioChannel;
import com.iluwatar.reactor.framework.ChannelHandler;
import com.iluwatar.reactor.framework.NioDatagramChannel;
import com.iluwatar.reactor.framework.NioReactor;
import com.iluwatar.reactor.framework.NioServerSocketChannel;
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
/**
* This application demonstrates Reactor pattern. It represents a Distributed Logging Service
* where it can listen on multiple TCP or UDP sockets for incoming log requests.
*
* <p>
* <i>INTENT</i>
* <br/>
* The Reactor design pattern handles service requests that are delivered concurrently to an
* application by one or more clients. The application can register specific handlers for processing
* which are called by reactor on specific events.
*
* <p>
* <i>PROBLEM</i>
* <br/>
* Server applications in a distributed system must handle multiple clients that send them service
* requests. Following forces need to be resolved:
* <ul>
* <li>Availability</li>
* <li>Efficiency</li>
* <li>Programming Simplicity</li>
* <li>Adaptability</li>
* </ul>
*
* <p>
* The application utilizes single thread to listen for requests on all ports. It does not create
* a separate thread for each client, which provides better scalability under load (number of clients
* increase).
*
* <p>
* The example uses Java NIO framework to implement the Reactor.
*
* @author npathai
*
*/
public class App {
private NioReactor reactor;
/**
* App entry.
*/
public static void main(String[] args) {
try {
new App().start();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Starts the NIO reactor.
* @throws IOException if any channel fails to bind.
*/
public void start() throws IOException {
/*
* The application can customize its event dispatching mechanism.
*/
reactor = new NioReactor(new ThreadPoolDispatcher(2));
/*
* This represents application specific business logic that dispatcher will call
* on appropriate events. These events are read and write event in our example.
*/
LoggingHandler loggingHandler = new LoggingHandler();
/*
* Our application binds to multiple I/O channels and uses same logging handler to handle
* incoming log requests.
*/
reactor
.registerChannel(tcpChannel(6666, loggingHandler))
.registerChannel(tcpChannel(6667, loggingHandler))
.registerChannel(udpChannel(6668, loggingHandler))
.start();
}
/**
* Stops the NIO reactor. This is a blocking call.
*/
public void stop() {
reactor.stop();
}
private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
channel.bind();
return channel;
}
private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException {
NioDatagramChannel channel = new NioDatagramChannel(port, handler);
channel.bind();
return channel;
}
}

View File

@ -1,4 +1,4 @@
package com.iluwatar.reactor;
package com.iluwatar.reactor.app;
import java.io.IOException;
import java.io.InputStream;

View File

@ -0,0 +1,62 @@
package com.iluwatar.reactor.app;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import com.iluwatar.reactor.framework.AbstractNioChannel;
import com.iluwatar.reactor.framework.ChannelHandler;
import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket;
/**
* Logging server application logic. It logs the incoming requests on standard console and returns
* a canned acknowledgement back to the remote peer.
*
* @author npathai
*/
public class LoggingHandler implements ChannelHandler {
private static final byte[] ACK = "Data logged successfully".getBytes();
/**
* Decodes the received data and logs it on standard console.
*/
@Override
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
/*
* As this channel is attached to both TCP and UDP channels we need to check whether
* the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel).
*/
if (readObject instanceof ByteBuffer) {
byte[] data = ((ByteBuffer)readObject).array();
doLogging(data);
sendReply(channel, data, key);
} else if (readObject instanceof DatagramPacket) {
DatagramPacket datagram = (DatagramPacket)readObject;
byte[] data = datagram.getData().array();
doLogging(data);
sendReply(channel, datagram, key);
} else {
throw new IllegalStateException("Unknown data received");
}
}
private void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) {
/*
* Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming message.
*/
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK));
replyPacket.setReceiver(incomingPacket.getSender());
channel.write(replyPacket, key);
}
private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
ByteBuffer buffer = ByteBuffer.wrap(ACK);
channel.write(buffer, key);
}
private void doLogging(byte[] data) {
// assuming UTF-8 :(
System.out.println(new String(data));
}
}

View File

@ -0,0 +1,150 @@
package com.iluwatar.reactor.framework;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* This represents the <i>Handle</i> of Reactor pattern. These are resources managed by OS
* which can be submitted to {@link NioReactor}.
*
* <p>
* This class serves has the responsibility of reading the data when a read event occurs and
* writing the data back when the channel is writable. It leaves the reading and writing of
* data on the concrete implementation. It provides a block writing mechanism wherein when
* any {@link ChannelHandler} wants to write data back, it queues the data in pending write queue
* and clears it in block manner. This provides better throughput.
*
* @author npathai
*
*/
public abstract class AbstractNioChannel {
private SelectableChannel channel;
private ChannelHandler handler;
private Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private NioReactor reactor;
/**
* Creates a new channel.
* @param handler which will handle events occurring on this channel.
* @param channel a NIO channel to be wrapped.
*/
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
this.handler = handler;
this.channel = channel;
}
/**
* Injects the reactor in this channel.
*/
void setReactor(NioReactor reactor) {
this.reactor = reactor;
}
/**
* @return the wrapped NIO channel.
*/
public SelectableChannel getChannel() {
return channel;
}
/**
* The operation in which the channel is interested, this operation is be provided to {@link Selector}.
*
* @return interested operation.
* @see SelectionKey
*/
public abstract int getInterestedOps();
/**
* Requests the channel to bind.
*
* @throws IOException if any I/O error occurs.
*/
public abstract void bind() throws IOException;
/**
* Reads the data using the key and returns the read data.
* @param key the key which is readable.
* @return data read.
* @throws IOException if any I/O error occurs.
*/
public abstract Object read(SelectionKey key) throws IOException;
/**
* @return the handler associated with this channel.
*/
public ChannelHandler getHandler() {
return handler;
}
/*
* Called from the context of reactor thread when the key becomes writable.
* The channel writes the whole pending block of data at once.
*/
void flush(SelectionKey key) throws IOException {
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
while (true) {
Object pendingWrite = pendingWrites.poll();
if (pendingWrite == null) {
// We don't have anything more to write so channel is interested in reading more data
reactor.changeOps(key, SelectionKey.OP_READ);
break;
}
// ask the concrete channel to make sense of data and write it to java channel
doWrite(pendingWrite, key);
}
}
/**
* Writes the data to the channel.
*
* @param pendingWrite data which was queued for writing in batch mode.
* @param key the key which is writable.
* @throws IOException if any I/O error occurs.
*/
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
/**
* Queues the data for writing. The data is not guaranteed to be written on underlying channel
* when this method returns. It will be written when the channel is flushed.
*
* <p>
* This method is used by the {@link ChannelHandler} to send reply back to the client.
* <br/>
* Example:
* <pre>
* <code>
* {@literal @}Override
* public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
* byte[] data = ((ByteBuffer)readObject).array();
* ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
* channel.write(buffer, key);
* }
* </code>
*
* @param data the data to be written on underlying channel.
* @param key the key which is writable.
*/
public void write(Object data, SelectionKey key) {
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
synchronized (this.channelToPendingWrites) {
pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) {
pendingWrites = new ConcurrentLinkedQueue<>();
this.channelToPendingWrites.put(key.channel(), pendingWrites);
}
}
}
pendingWrites.add(data);
reactor.changeOps(key, SelectionKey.OP_WRITE);
}
}

View File

@ -0,0 +1,25 @@
package com.iluwatar.reactor.framework;
import java.nio.channels.SelectionKey;
/**
* Represents the <i>EventHandler</i> of Reactor pattern. It handles the incoming events dispatched
* to it by the {@link Dispatcher}. This is where the application logic resides.
*
* <p>
* A {@link ChannelHandler} is associated with one or many {@link AbstractNioChannel}s, and whenever
* an event occurs on any of the associated channels, the handler is notified of the event.
*
* @author npathai
*/
public interface ChannelHandler {
/**
* Called when the {@code channel} has received some data from remote peer.
*
* @param channel the channel from which the data is received.
* @param readObject the data read.
* @param key the key from which the data is received.
*/
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
}

View File

@ -0,0 +1,38 @@
package com.iluwatar.reactor.framework;
import java.nio.channels.SelectionKey;
/**
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write
* or connect, and then calls the {@link Dispatcher} to dispatch the event. This decouples the I/O
* processing from application specific processing.
* <br/>
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred.
*
* <p>
* The application can customize the way in which event is dispatched such as using the reactor thread to
* dispatch event to channels or use a worker pool to do the non I/O processing.
*
* @see SameThreadDispatcher
* @see ThreadPoolDispatcher
*
* @author npathai
*/
public interface Dispatcher {
/**
* This hook method is called when read event occurs on particular channel. The data read
* is provided in <code>readObject</code>. The implementation should dispatch this read event
* to the associated {@link ChannelHandler} of <code>channel</code>.
*
* @param channel on which read event occurred
* @param readObject object read by channel
* @param key on which event occurred
*/
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
/**
* Stops the dispatching events and cleans up any acquired resources such as threads.
*/
void stop();
}

View File

@ -0,0 +1,156 @@
package com.iluwatar.reactor.framework;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
/**
* A wrapper over {@link DatagramChannel} which can read and write data on a DatagramChannel.
*
* @author npathai
*/
public class NioDatagramChannel extends AbstractNioChannel {
private int port;
/**
* Creates a {@link DatagramChannel} which will bind at provided port and use <code>handler</code> to handle
* incoming events on this channel.
* <p>
* Note the constructor does not bind the socket, {@link #bind()} method should be called for binding
* the socket.
*
* @param port the port to be bound to listen for incoming datagram requests.
* @param handler the handler to be used for handling incoming requests on this channel.
* @throws IOException if any I/O error occurs.
*/
public NioDatagramChannel(int port, ChannelHandler handler) throws IOException {
super(handler, DatagramChannel.open());
this.port = port;
}
@Override
public int getInterestedOps() {
/* there is no need to accept connections in UDP, so the channel shows interest in
* reading data.
*/
return SelectionKey.OP_READ;
}
/**
* Reads and returns a {@link DatagramPacket} from the underlying channel.
* @return the datagram packet read having the sender address.
*/
@Override
public DatagramPacket read(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketAddress sender = getChannel().receive(buffer);
/*
* It is required to create a DatagramPacket because we need to preserve which
* socket address acts as destination for sending reply packets.
*/
DatagramPacket packet = new DatagramPacket(buffer);
packet.setSender(sender);
return packet;
}
/**
* @return the underlying datagram channel.
*/
@Override
public DatagramChannel getChannel() {
return (DatagramChannel) super.getChannel();
}
/**
* Binds UDP socket on the provided <code>port</code>.
*
* @throws IOException if any I/O error occurs.
*/
@Override
public void bind() throws IOException {
getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
getChannel().configureBlocking(false);
System.out.println("Bound UDP socket at port: " + port);
}
/**
* Writes the pending {@link DatagramPacket} to the underlying channel sending data to
* the intended receiver of the packet.
*/
@Override
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
DatagramPacket pendingPacket = (DatagramPacket) pendingWrite;
getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
}
/**
* Write the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
* datagram packet must be set in the <code>data</code> using {@link DatagramPacket#setReceiver(SocketAddress)}.
*/
@Override
public void write(Object data, SelectionKey key) {
super.write(data, key);
}
/**
* Container of data used for {@link NioDatagramChannel} to communicate with remote peer.
*/
public static class DatagramPacket {
private SocketAddress sender;
private ByteBuffer data;
private SocketAddress receiver;
/**
* Creates a container with underlying data.
*
* @param data the underlying message to be written on channel.
*/
public DatagramPacket(ByteBuffer data) {
this.data = data;
}
/**
* @return the sender address.
*/
public SocketAddress getSender() {
return sender;
}
/**
* Sets the sender address of this packet.
* @param sender the sender address.
*/
public void setSender(SocketAddress sender) {
this.sender = sender;
}
/**
* @return the receiver address.
*/
public SocketAddress getReceiver() {
return receiver;
}
/**
* Sets the intended receiver address. This must be set when writing to the channel.
* @param receiver the receiver address.
*/
public void setReceiver(SocketAddress receiver) {
this.receiver = receiver;
}
/**
* @return the underlying message that will be written on channel.
*/
public ByteBuffer getData() {
return data;
}
}
}

View File

@ -0,0 +1,242 @@
package com.iluwatar.reactor.framework;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern.
* Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks
* for events from all these handles. Whenever an event occurs on any of the registered handles,
* it synchronously de-multiplexes the event which can be any of read, write or accept, and
* dispatches the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}.
*
* <p>
* Implementation:
* A NIO reactor runs in its own thread when it is started using {@link #start()} method.
* {@link NioReactor} uses {@link Selector} as a mechanism for achieving Synchronous Event De-multiplexing.
*
* <p>
* NOTE: This is one of the way to implement NIO reactor and it does not take care of all possible edge cases
* which may be required in a real application. This implementation is meant to demonstrate the fundamental
* concepts that lie behind Reactor pattern.
*
* @author npathai
*
*/
public class NioReactor {
private Selector selector;
private Dispatcher dispatcher;
/**
* All the work of altering the SelectionKey operations and Selector operations are performed in
* the context of main event loop of reactor. So when any channel needs to change its readability
* or writability, a new command is added in the command queue and then the event loop picks up
* the command and executes it in next iteration.
*/
private Queue<Runnable> pendingCommands = new ConcurrentLinkedQueue<>();
private ExecutorService reactorMain = Executors.newSingleThreadExecutor();
/**
* Creates a reactor which will use provided {@code dispatcher} to dispatch events.
* The application can provide various implementations of dispatcher which suits its
* needs.
*
* @param dispatcher a non-null dispatcher used to dispatch events on registered channels.
* @throws IOException if any I/O error occurs.
*/
public NioReactor(Dispatcher dispatcher) throws IOException {
this.dispatcher = dispatcher;
this.selector = Selector.open();
}
/**
* Starts the reactor event loop in a new thread.
*
* @throws IOException if any I/O error occurs.
*/
public void start() throws IOException {
reactorMain.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("Reactor started, waiting for events...");
eventLoop();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
/**
* Stops the reactor and related resources such as dispatcher.
*/
public void stop() {
reactorMain.shutdownNow();
selector.wakeup();
try {
reactorMain.awaitTermination(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
dispatcher.stop();
}
/**
* Registers a new channel (handle) with this reactor after which the reactor will wait for events
* on this channel. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()}
* to know about the interested operation of this channel.
*
* @param channel a new handle on which reactor will wait for events. The channel must be bound
* prior to being registered.
* @return this
* @throws IOException if any I/O error occurs.
*/
public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps());
key.attach(channel);
channel.setReactor(this);
return this;
}
private void eventLoop() throws IOException {
while (true) {
// Honor interrupt request
if (Thread.interrupted()) {
break;
}
// honor any pending commands first
processPendingCommands();
/*
* Synchronous event de-multiplexing happens here, this is blocking call which
* returns when it is possible to initiate non-blocking operation on any of the
* registered channels.
*/
selector.select();
/*
* Represents the events that have occurred on registered handles.
*/
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) {
iterator.remove();
continue;
}
processKey(key);
}
keys.clear();
}
}
private void processPendingCommands() {
Iterator<Runnable> iterator = pendingCommands.iterator();
while (iterator.hasNext()) {
Runnable command = iterator.next();
command.run();
iterator.remove();
}
}
/*
* Initiation dispatcher logic, it checks the type of event and notifier application
* specific event handler to handle the event.
*/
private void processKey(SelectionKey key) throws IOException {
if (key.isAcceptable()) {
onChannelAcceptable(key);
} else if (key.isReadable()) {
onChannelReadable(key);
} else if (key.isWritable()) {
onChannelWritable(key);
}
}
private void onChannelWritable(SelectionKey key) throws IOException {
AbstractNioChannel channel = (AbstractNioChannel) key.attachment();
channel.flush(key);
}
private void onChannelReadable(SelectionKey key) {
try {
// reads the incoming data in context of reactor main loop. Can this be improved?
Object readObject = ((AbstractNioChannel)key.attachment()).read(key);
dispatchReadEvent(key, readObject);
} catch (IOException e) {
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
/*
* Uses the application provided dispatcher to dispatch events to respective handlers.
*/
private void dispatchReadEvent(SelectionKey key, Object readObject) {
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key);
}
private void onChannelAcceptable(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
readKey.attach(key.attachment());
}
/**
* Queues the change of operations request of a channel, which will change the interested
* operations of the channel sometime in future.
* <p>
* This is a non-blocking method and does not guarantee that the operations are changed when
* this method returns.
*
* @param key the key for which operations are to be changed.
* @param interestedOps the new interest operations.
*/
public void changeOps(SelectionKey key, int interestedOps) {
pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps));
selector.wakeup();
}
/**
* A command that changes the interested operations of the key provided.
*/
class ChangeKeyOpsCommand implements Runnable {
private SelectionKey key;
private int interestedOps;
public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) {
this.key = key;
this.interestedOps = interestedOps;
}
public void run() {
key.interestOps(interestedOps);
}
@Override
public String toString() {
return "Change of ops to: " + interestedOps;
}
}
}

View File

@ -1,4 +1,4 @@
package com.iluwatar.reactor;
package com.iluwatar.reactor.framework;
import java.io.IOException;
import java.net.InetAddress;
@ -8,25 +8,51 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
/**
* A wrapper over {@link NioServerSocketChannel} which can read and write data on a {@link SocketChannel}.
*
* @author npathai
*/
public class NioServerSocketChannel extends AbstractNioChannel {
private int port;
/**
* Creates a {@link ServerSocketChannel} which will bind at provided port and use
* <code>handler</code> to handle incoming events on this channel.
* <p>
* Note the constructor does not bind the socket, {@link #bind()} method should be called for binding
* the socket.
*
* @param port the port to be bound to listen for incoming requests.
* @param handler the handler to be used for handling incoming requests on this channel.
* @throws IOException if any I/O error occurs.
*/
public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException {
super(handler, ServerSocketChannel.open());
this.port = port;
}
@Override
public int getInterestedOps() {
// being a server socket channel it is interested in accepting connection from remote clients.
return SelectionKey.OP_ACCEPT;
}
/**
* @return the underlying {@link ServerSocketChannel}.
*/
@Override
public ServerSocketChannel getChannel() {
return (ServerSocketChannel) super.getChannel();
}
/**
* Reads and returns {@link ByteBuffer} from the underlying {@link SocketChannel} represented by
* the <code>key</code>. Due to the fact that there is a dedicated channel for each client connection
* we don't need to store the sender.
*/
@Override
public ByteBuffer read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
@ -38,12 +64,22 @@ public class NioServerSocketChannel extends AbstractNioChannel {
return buffer;
}
/**
* Binds TCP socket on the provided <code>port</code>.
*
* @throws IOException if any I/O error occurs.
*/
@Override
public void bind() throws IOException {
((ServerSocketChannel)getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
((ServerSocketChannel)getChannel()).configureBlocking(false);
System.out.println("Bound TCP socket at port: " + port);
}
/**
* Writes the pending {@link ByteBuffer} to the underlying channel sending data to
* the intended receiver of the packet.
*/
@Override
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;

View File

@ -0,0 +1,43 @@
package com.iluwatar.reactor.framework;
import java.nio.channels.SelectionKey;
/**
* Dispatches the events in the context of caller thread. This implementation is a good fit for
* small applications where there are limited clients. Using this implementation limits the scalability
* because the I/O thread performs the application specific processing.
*
* <p>
* For real applications use {@link ThreadPoolDispatcher}.
*
* @see ThreadPoolDispatcher
*
* @author npathai
*/
public class SameThreadDispatcher implements Dispatcher {
/**
* Dispatches the read event in the context of caller thread.
* <br/>
* Note this is a blocking call. It returns only after the associated handler has handled the
* read event.
*/
@Override
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
if (channel.getHandler() != null) {
/*
* Calls the associated handler to notify the read event where application specific code
* resides.
*/
channel.getHandler().handleChannelRead(channel, readObject, key);
}
}
/**
* No resources to free.
*/
@Override
public void stop() {
// no-op
}
}

View File

@ -0,0 +1,55 @@
package com.iluwatar.reactor.framework;
import java.nio.channels.SelectionKey;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* An implementation that uses a pool of worker threads to dispatch the events. This provides
* for better scalability as the application specific processing is not performed in the context
* of I/O thread.
*
* @author npathai
*
*/
public class ThreadPoolDispatcher extends SameThreadDispatcher {
private ExecutorService executorService;
/**
* Creates a pooled dispatcher with tunable pool size.
*
* @param poolSize number of pooled threads
*/
public ThreadPoolDispatcher(int poolSize) {
this.executorService = Executors.newFixedThreadPool(poolSize);
}
/**
* Submits the work of dispatching the read event to worker pool, where it gets picked
* up by worker threads.
* <br/>
* Note that this is a non-blocking call and returns immediately. It is not guaranteed
* that the event has been handled by associated handler.
*/
@Override
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
executorService.execute(() ->
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key));
}
/**
* Stops the pool of workers.
*/
@Override
public void stop() {
executorService.shutdownNow();
try {
executorService.awaitTermination(1000, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -1,4 +1,4 @@
package com.iluwatar.reactor;
package com.iluwatar.reactor.app;
import java.io.IOException;

View File

@ -2,3 +2,12 @@
* Cleanup
* Document - Javadoc
* Better design?? Get review of @iluwatar
Design view:
Handles ---> AbstractNioChannel
Selector ---> Synchronous Event Demultiplexer
NioReactor ---> Initiation Dispatcher