|
|
|
@ -14,40 +14,41 @@ import java.util.concurrent.Executors;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern.
|
|
|
|
|
* Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks
|
|
|
|
|
* for events from all these handles. Whenever an event occurs on any of the registered handles, it
|
|
|
|
|
* synchronously de-multiplexes the event which can be any of read, write or accept, and dispatches
|
|
|
|
|
* the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}.
|
|
|
|
|
* This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern. Multiple handles
|
|
|
|
|
* i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks for events from all these handles.
|
|
|
|
|
* Whenever an event occurs on any of the registered handles, it synchronously de-multiplexes the event which can be any
|
|
|
|
|
* of read, write or accept, and dispatches the event to the appropriate {@link ChannelHandler} using the
|
|
|
|
|
* {@link Dispatcher}.
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* Implementation: A NIO reactor runs in its own thread when it is started using {@link #start()}
|
|
|
|
|
* method. {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing.
|
|
|
|
|
* Implementation: A NIO reactor runs in its own thread when it is started using {@link #start()} method.
|
|
|
|
|
* {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing.
|
|
|
|
|
*
|
|
|
|
|
* <p>
|
|
|
|
|
* NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible
|
|
|
|
|
* edge cases which are required in a real application. This implementation is meant to demonstrate
|
|
|
|
|
* the fundamental concepts that lie behind Reactor pattern.
|
|
|
|
|
* NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible edge cases which are
|
|
|
|
|
* required in a real application. This implementation is meant to demonstrate the fundamental concepts that lie behind
|
|
|
|
|
* Reactor pattern.
|
|
|
|
|
*/
|
|
|
|
|
public class NioReactor {
|
|
|
|
|
|
|
|
|
|
private final Selector selector;
|
|
|
|
|
private final Dispatcher dispatcher;
|
|
|
|
|
/**
|
|
|
|
|
* All the work of altering the SelectionKey operations and Selector operations are performed in
|
|
|
|
|
* the context of main event loop of reactor. So when any channel needs to change its readability
|
|
|
|
|
* or writability, a new command is added in the command queue and then the event loop picks up
|
|
|
|
|
* the command and executes it in next iteration.
|
|
|
|
|
* All the work of altering the SelectionKey operations and Selector operations are performed in the context of main
|
|
|
|
|
* event loop of reactor. So when any channel needs to change its readability or writability, a new command is added
|
|
|
|
|
* in the command queue and then the event loop picks up the command and executes it in next iteration.
|
|
|
|
|
*/
|
|
|
|
|
private final Queue<Runnable> pendingCommands = new ConcurrentLinkedQueue<>();
|
|
|
|
|
private final ExecutorService reactorMain = Executors.newSingleThreadExecutor();
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates a reactor which will use provided {@code dispatcher} to dispatch events. The
|
|
|
|
|
* application can provide various implementations of dispatcher which suits its needs.
|
|
|
|
|
* Creates a reactor which will use provided {@code dispatcher} to dispatch events. The application can provide
|
|
|
|
|
* various implementations of dispatcher which suits its needs.
|
|
|
|
|
*
|
|
|
|
|
* @param dispatcher a non-null dispatcher used to dispatch events on registered channels.
|
|
|
|
|
* @throws IOException if any I/O error occurs.
|
|
|
|
|
* @param dispatcher
|
|
|
|
|
* a non-null dispatcher used to dispatch events on registered channels.
|
|
|
|
|
* @throws IOException
|
|
|
|
|
* if any I/O error occurs.
|
|
|
|
|
*/
|
|
|
|
|
public NioReactor(Dispatcher dispatcher) throws IOException {
|
|
|
|
|
this.dispatcher = dispatcher;
|
|
|
|
@ -57,7 +58,8 @@ public class NioReactor {
|
|
|
|
|
/**
|
|
|
|
|
* Starts the reactor event loop in a new thread.
|
|
|
|
|
*
|
|
|
|
|
* @throws IOException if any I/O error occurs.
|
|
|
|
|
* @throws IOException
|
|
|
|
|
* if any I/O error occurs.
|
|
|
|
|
*/
|
|
|
|
|
public void start() throws IOException {
|
|
|
|
|
reactorMain.execute(() -> {
|
|
|
|
@ -73,8 +75,10 @@ public class NioReactor {
|
|
|
|
|
/**
|
|
|
|
|
* Stops the reactor and related resources such as dispatcher.
|
|
|
|
|
*
|
|
|
|
|
* @throws InterruptedException if interrupted while stopping the reactor.
|
|
|
|
|
* @throws IOException if any I/O error occurs.
|
|
|
|
|
* @throws InterruptedException
|
|
|
|
|
* if interrupted while stopping the reactor.
|
|
|
|
|
* @throws IOException
|
|
|
|
|
* if any I/O error occurs.
|
|
|
|
|
*/
|
|
|
|
|
public void stop() throws InterruptedException, IOException {
|
|
|
|
|
reactorMain.shutdownNow();
|
|
|
|
@ -84,15 +88,15 @@ public class NioReactor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Registers a new channel (handle) with this reactor. Reactor will start waiting for events on
|
|
|
|
|
* this channel and notify of any events. While registering the channel the reactor uses
|
|
|
|
|
* {@link AbstractNioChannel#getInterestedOps()} to know about the interested operation of this
|
|
|
|
|
* channel.
|
|
|
|
|
* Registers a new channel (handle) with this reactor. Reactor will start waiting for events on this channel and
|
|
|
|
|
* notify of any events. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()}
|
|
|
|
|
* to know about the interested operation of this channel.
|
|
|
|
|
*
|
|
|
|
|
* @param channel a new channel on which reactor will wait for events. The channel must be bound
|
|
|
|
|
* prior to being registered.
|
|
|
|
|
* @param channel
|
|
|
|
|
* a new channel on which reactor will wait for events. The channel must be bound prior to being registered.
|
|
|
|
|
* @return this
|
|
|
|
|
* @throws IOException if any I/O error occurs.
|
|
|
|
|
* @throws IOException
|
|
|
|
|
* if any I/O error occurs.
|
|
|
|
|
*/
|
|
|
|
|
public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
|
|
|
|
|
SelectionKey key = channel.getJavaChannel().register(selector, channel.getInterestedOps());
|
|
|
|
@ -113,8 +117,8 @@ public class NioReactor {
|
|
|
|
|
processPendingCommands();
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Synchronous event de-multiplexing happens here, this is blocking call which returns when it
|
|
|
|
|
* is possible to initiate non-blocking operation on any of the registered channels.
|
|
|
|
|
* Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
|
|
|
|
|
* initiate non-blocking operation on any of the registered channels.
|
|
|
|
|
*/
|
|
|
|
|
selector.select();
|
|
|
|
|
|
|
|
|
@ -147,8 +151,8 @@ public class NioReactor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Initiation dispatcher logic, it checks the type of event and notifier application specific
|
|
|
|
|
* event handler to handle the event.
|
|
|
|
|
* Initiation dispatcher logic, it checks the type of event and notifier application specific event handler to handle
|
|
|
|
|
* the event.
|
|
|
|
|
*/
|
|
|
|
|
private void processKey(SelectionKey key) throws IOException {
|
|
|
|
|
if (key.isAcceptable()) {
|
|
|
|
@ -196,14 +200,15 @@ public class NioReactor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Queues the change of operations request of a channel, which will change the interested
|
|
|
|
|
* operations of the channel sometime in future.
|
|
|
|
|
* Queues the change of operations request of a channel, which will change the interested operations of the channel
|
|
|
|
|
* sometime in future.
|
|
|
|
|
* <p>
|
|
|
|
|
* This is a non-blocking method and does not guarantee that the operations have changed when this
|
|
|
|
|
* method returns.
|
|
|
|
|
* This is a non-blocking method and does not guarantee that the operations have changed when this method returns.
|
|
|
|
|
*
|
|
|
|
|
* @param key the key for which operations have to be changed.
|
|
|
|
|
* @param interestedOps the new interest operations.
|
|
|
|
|
* @param key
|
|
|
|
|
* the key for which operations have to be changed.
|
|
|
|
|
* @param interestedOps
|
|
|
|
|
* the new interest operations.
|
|
|
|
|
*/
|
|
|
|
|
public void changeOps(SelectionKey key, int interestedOps) {
|
|
|
|
|
pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps));
|
|
|
|
|