Resolves checkstyle errors for patterns starting with letter r (#1072)
* Reduces checkstyle errors in reactor * Reduces checkstyle errors in reader-writer-lock * Reduces checkstyle errors in repository * Reduces checkstyle errors in resource-acquisition-is-initialization * Reduces checkstyle errors in retry
This commit is contained in:
committed by
Ilkka Seppälä
parent
4dae1fae57
commit
9c8ad4485b
@ -23,10 +23,6 @@
|
||||
|
||||
package com.iluwatar.reactor.app;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import com.iluwatar.reactor.framework.AbstractNioChannel;
|
||||
import com.iluwatar.reactor.framework.ChannelHandler;
|
||||
import com.iluwatar.reactor.framework.Dispatcher;
|
||||
@ -34,19 +30,20 @@ import com.iluwatar.reactor.framework.NioDatagramChannel;
|
||||
import com.iluwatar.reactor.framework.NioReactor;
|
||||
import com.iluwatar.reactor.framework.NioServerSocketChannel;
|
||||
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging
|
||||
* Service where it listens on multiple TCP or UDP sockets for incoming log requests.
|
||||
*
|
||||
* <p>
|
||||
* <i>INTENT</i> <br>
|
||||
*
|
||||
* <p><i>INTENT</i> <br>
|
||||
* The Reactor design pattern handles service requests that are delivered concurrently to an
|
||||
* application by one or more clients. The application can register specific handlers for processing
|
||||
* which are called by reactor on specific events.
|
||||
*
|
||||
* <p>
|
||||
* <i>PROBLEM</i> <br>
|
||||
*
|
||||
* <p><i>PROBLEM</i> <br>
|
||||
* Server applications in a distributed system must handle multiple clients that send them service
|
||||
* requests. Following forces need to be resolved:
|
||||
* <ul>
|
||||
@ -55,9 +52,8 @@ import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
|
||||
* <li>Programming Simplicity</li>
|
||||
* <li>Adaptability</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>
|
||||
* <i>PARTICIPANTS</i> <br>
|
||||
*
|
||||
* <p><i>PARTICIPANTS</i> <br>
|
||||
* <ul>
|
||||
* <li>Synchronous Event De-multiplexer
|
||||
* <p>
|
||||
@ -89,7 +85,6 @@ import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
|
||||
* separate thread for each client, which provides better scalability under load (number of clients
|
||||
* increase).
|
||||
* The example uses Java NIO framework to implement the Reactor.
|
||||
*
|
||||
*/
|
||||
public class App {
|
||||
|
||||
@ -100,7 +95,7 @@ public class App {
|
||||
/**
|
||||
* Creates an instance of App which will use provided dispatcher for dispatching events on
|
||||
* reactor.
|
||||
*
|
||||
*
|
||||
* @param dispatcher the dispatcher that will be used to dispatch events.
|
||||
*/
|
||||
public App(Dispatcher dispatcher) {
|
||||
@ -142,9 +137,9 @@ public class App {
|
||||
|
||||
/**
|
||||
* Stops the NIO reactor. This is a blocking call.
|
||||
*
|
||||
*
|
||||
* @throws InterruptedException if interrupted while stopping the reactor.
|
||||
* @throws IOException if any I/O error occurs
|
||||
* @throws IOException if any I/O error occurs
|
||||
*/
|
||||
public void stop() throws InterruptedException, IOException {
|
||||
reactor.stop();
|
||||
|
@ -23,9 +23,6 @@
|
||||
|
||||
package com.iluwatar.reactor.app;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
@ -39,6 +36,8 @@ import java.net.UnknownHostException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging
|
||||
@ -158,7 +157,7 @@ public class AppClient {
|
||||
* Creates a new UDP logging client.
|
||||
*
|
||||
* @param clientName the name of the client to be sent in logging requests.
|
||||
* @param port the port on which client will send logging requests.
|
||||
* @param port the port on which client will send logging requests.
|
||||
* @throws UnknownHostException if localhost is unknown
|
||||
*/
|
||||
public UdpLoggingClient(String clientName, int port) throws UnknownHostException {
|
||||
|
@ -23,12 +23,11 @@
|
||||
|
||||
package com.iluwatar.reactor.app;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
|
||||
import com.iluwatar.reactor.framework.AbstractNioChannel;
|
||||
import com.iluwatar.reactor.framework.ChannelHandler;
|
||||
import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -63,7 +62,11 @@ public class LoggingHandler implements ChannelHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private static void sendReply(AbstractNioChannel channel, DatagramPacket incomingPacket, SelectionKey key) {
|
||||
private static void sendReply(
|
||||
AbstractNioChannel channel,
|
||||
DatagramPacket incomingPacket,
|
||||
SelectionKey key
|
||||
) {
|
||||
/*
|
||||
* Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming
|
||||
* message.
|
||||
|
@ -35,13 +35,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
/**
|
||||
* This represents the <i>Handle</i> of Reactor pattern. These are resources managed by OS which can
|
||||
* be submitted to {@link NioReactor}.
|
||||
*
|
||||
* <p>
|
||||
* This class serves has the responsibility of reading the data when a read event occurs and writing
|
||||
* the data back when the channel is writable. It leaves the reading and writing of data on the
|
||||
* concrete implementation. It provides a block writing mechanism wherein when any
|
||||
* {@link ChannelHandler} wants to write data back, it queues the data in pending write queue and
|
||||
* clears it in block manner. This provides better throughput.
|
||||
*
|
||||
* <p>This class serves has the responsibility of reading the data when a read event occurs and
|
||||
* writing the data back when the channel is writable. It leaves the reading and writing of data on
|
||||
* the concrete implementation. It provides a block writing mechanism wherein when any {@link
|
||||
* ChannelHandler} wants to write data back, it queues the data in pending write queue and clears it
|
||||
* in block manner. This provides better throughput.
|
||||
*/
|
||||
public abstract class AbstractNioChannel {
|
||||
|
||||
@ -53,7 +52,7 @@ public abstract class AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Creates a new channel.
|
||||
*
|
||||
*
|
||||
* @param handler which will handle events occurring on this channel.
|
||||
* @param channel a NIO channel to be wrapped.
|
||||
*/
|
||||
@ -70,6 +69,8 @@ public abstract class AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get channel.
|
||||
*
|
||||
* @return the wrapped NIO channel.
|
||||
*/
|
||||
public SelectableChannel getJavaChannel() {
|
||||
@ -77,9 +78,9 @@ public abstract class AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* The operation in which the channel is interested, this operation is provided to
|
||||
* {@link Selector}.
|
||||
*
|
||||
* The operation in which the channel is interested, this operation is provided to {@link
|
||||
* Selector}.
|
||||
*
|
||||
* @return interested operation.
|
||||
* @see SelectionKey
|
||||
*/
|
||||
@ -87,7 +88,7 @@ public abstract class AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Binds the channel on provided port.
|
||||
*
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
public abstract void bind() throws IOException;
|
||||
@ -95,7 +96,7 @@ public abstract class AbstractNioChannel {
|
||||
/**
|
||||
* Reads the data using the key and returns the read data. The underlying channel should be
|
||||
* fetched using {@link SelectionKey#channel()}.
|
||||
*
|
||||
*
|
||||
* @param key the key on which read event occurred.
|
||||
* @return data read.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
@ -103,6 +104,8 @@ public abstract class AbstractNioChannel {
|
||||
public abstract Object read(SelectionKey key) throws IOException;
|
||||
|
||||
/**
|
||||
* Get handler.
|
||||
*
|
||||
* @return the handler associated with this channel.
|
||||
*/
|
||||
public ChannelHandler getHandler() {
|
||||
@ -130,9 +133,9 @@ public abstract class AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Writes the data to the channel.
|
||||
*
|
||||
*
|
||||
* @param pendingWrite the data to be written on channel.
|
||||
* @param key the key which is writable.
|
||||
* @param key the key which is writable.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
|
||||
@ -140,24 +143,23 @@ public abstract class AbstractNioChannel {
|
||||
/**
|
||||
* Queues the data for writing. The data is not guaranteed to be written on underlying channel
|
||||
* when this method returns. It will be written when the channel is flushed.
|
||||
*
|
||||
* <p>
|
||||
* This method is used by the {@link ChannelHandler} to send reply back to the client. <br>
|
||||
*
|
||||
* <p>This method is used by the {@link ChannelHandler} to send reply back to the client. <br>
|
||||
* Example:
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* {@literal @}Override
|
||||
* public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
|
||||
* byte[] data = ((ByteBuffer)readObject).array();
|
||||
* public void handleChannelRead(AbstractNioChannel channel, Object readObj, SelectionKey key) {
|
||||
* byte[] data = ((ByteBuffer)readObj).array();
|
||||
* ByteBuffer buffer = ByteBuffer.wrap("Server reply".getBytes());
|
||||
* channel.write(buffer, key);
|
||||
* }
|
||||
* </code>
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* @param data the data to be written on underlying channel.
|
||||
* @param key the key which is writable.
|
||||
* @param key the key which is writable.
|
||||
*/
|
||||
public void write(Object data, SelectionKey key) {
|
||||
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
|
||||
|
@ -28,19 +28,19 @@ import java.nio.channels.SelectionKey;
|
||||
/**
|
||||
* Represents the <i>EventHandler</i> of Reactor pattern. It handles the incoming events dispatched
|
||||
* to it by the {@link Dispatcher}. This is where the application logic resides.
|
||||
*
|
||||
* <p>
|
||||
* A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and
|
||||
* whenever an event occurs on any of the associated channels, the handler is notified of the event.
|
||||
*
|
||||
* <p>A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and
|
||||
* whenever an event occurs on any of the associated channels, the handler is notified of the
|
||||
* event.
|
||||
*/
|
||||
public interface ChannelHandler {
|
||||
|
||||
/**
|
||||
* Called when the {@code channel} receives some data from remote peer.
|
||||
*
|
||||
* @param channel the channel from which the data was received.
|
||||
*
|
||||
* @param channel the channel from which the data was received.
|
||||
* @param readObject the data read.
|
||||
* @param key the key on which read event occurred.
|
||||
* @param key the key on which read event occurred.
|
||||
*/
|
||||
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
|
||||
}
|
||||
|
@ -29,14 +29,12 @@ import java.nio.channels.SelectionKey;
|
||||
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
|
||||
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write or
|
||||
* connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the
|
||||
* I/O processing from application specific processing. <br>
|
||||
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event
|
||||
* occurred.
|
||||
*
|
||||
* <p>
|
||||
* The application can customize the way in which event is dispatched such as using the reactor
|
||||
* I/O processing from application specific processing. <br> Dispatcher should call the {@link
|
||||
* ChannelHandler} associated with the channel on which event occurred.
|
||||
*
|
||||
* <p>The application can customize the way in which event is dispatched such as using the reactor
|
||||
* thread to dispatch event to channels or use a worker pool to do the non I/O processing.
|
||||
*
|
||||
*
|
||||
* @see SameThreadDispatcher
|
||||
* @see ThreadPoolDispatcher
|
||||
*/
|
||||
@ -45,19 +43,18 @@ public interface Dispatcher {
|
||||
* This hook method is called when read event occurs on particular channel. The data read is
|
||||
* provided in <code>readObject</code>. The implementation should dispatch this read event to the
|
||||
* associated {@link ChannelHandler} of <code>channel</code>.
|
||||
*
|
||||
* <p>
|
||||
* The type of <code>readObject</code> depends on the channel on which data was received.
|
||||
*
|
||||
* @param channel on which read event occurred
|
||||
*
|
||||
* <p>The type of <code>readObject</code> depends on the channel on which data was received.
|
||||
*
|
||||
* @param channel on which read event occurred
|
||||
* @param readObject object read by channel
|
||||
* @param key on which event occurred
|
||||
* @param key on which event occurred
|
||||
*/
|
||||
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
|
||||
|
||||
/**
|
||||
* Stops dispatching events and cleans up any acquired resources such as threads.
|
||||
*
|
||||
*
|
||||
* @throws InterruptedException if interrupted while stopping dispatcher.
|
||||
*/
|
||||
void stop() throws InterruptedException;
|
||||
|
@ -23,9 +23,6 @@
|
||||
|
||||
package com.iluwatar.reactor.framework;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -33,6 +30,8 @@ import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A wrapper over {@link DatagramChannel} which can read and write data on a DatagramChannel.
|
||||
@ -46,11 +45,11 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
/**
|
||||
* Creates a {@link DatagramChannel} which will bind at provided port and use <code>handler</code>
|
||||
* to handle incoming events on this channel.
|
||||
* <p>
|
||||
* Note the constructor does not bind the socket, {@link #bind()} method should be called for
|
||||
*
|
||||
* <p>Note the constructor does not bind the socket, {@link #bind()} method should be called for
|
||||
* binding the socket.
|
||||
*
|
||||
* @param port the port to be bound to listen for incoming datagram requests.
|
||||
*
|
||||
* @param port the port to be bound to listen for incoming datagram requests.
|
||||
* @param handler the handler to be used for handling incoming requests on this channel.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
@ -69,7 +68,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Reads and returns a {@link DatagramPacket} from the underlying channel.
|
||||
*
|
||||
*
|
||||
* @return the datagram packet read having the sender address.
|
||||
*/
|
||||
@Override
|
||||
@ -89,6 +88,8 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get datagram channel.
|
||||
*
|
||||
* @return the underlying datagram channel.
|
||||
*/
|
||||
@Override
|
||||
@ -98,7 +99,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Binds UDP socket on the provided <code>port</code>.
|
||||
*
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
@Override
|
||||
@ -120,8 +121,8 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
|
||||
* datagram packet must be set in the <code>data</code> using
|
||||
* {@link DatagramPacket#setReceiver(SocketAddress)}.
|
||||
* datagram packet must be set in the <code>data</code> using {@link
|
||||
* DatagramPacket#setReceiver(SocketAddress)}.
|
||||
*/
|
||||
@Override
|
||||
public void write(Object data, SelectionKey key) {
|
||||
@ -138,7 +139,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Creates a container with underlying data.
|
||||
*
|
||||
*
|
||||
* @param data the underlying message to be written on channel.
|
||||
*/
|
||||
public DatagramPacket(ByteBuffer data) {
|
||||
@ -146,6 +147,8 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get sender address.
|
||||
*
|
||||
* @return the sender address.
|
||||
*/
|
||||
public SocketAddress getSender() {
|
||||
@ -154,7 +157,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Sets the sender address of this packet.
|
||||
*
|
||||
*
|
||||
* @param sender the sender address.
|
||||
*/
|
||||
public void setSender(SocketAddress sender) {
|
||||
@ -162,6 +165,8 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get receiver address.
|
||||
*
|
||||
* @return the receiver address.
|
||||
*/
|
||||
public SocketAddress getReceiver() {
|
||||
@ -170,7 +175,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Sets the intended receiver address. This must be set when writing to the channel.
|
||||
*
|
||||
*
|
||||
* @param receiver the receiver address.
|
||||
*/
|
||||
public void setReceiver(SocketAddress receiver) {
|
||||
@ -178,6 +183,8 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get data.
|
||||
*
|
||||
* @return the underlying message that will be written on channel.
|
||||
*/
|
||||
public ByteBuffer getData() {
|
||||
|
@ -23,9 +23,6 @@
|
||||
|
||||
package com.iluwatar.reactor.framework;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
@ -38,22 +35,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern. Multiple handles
|
||||
* i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks for events from all these handles.
|
||||
* Whenever an event occurs on any of the registered handles, it synchronously de-multiplexes the event which can be any
|
||||
* of read, write or accept, and dispatches the event to the appropriate {@link ChannelHandler} using the
|
||||
* {@link Dispatcher}.
|
||||
*
|
||||
* <p>
|
||||
* Implementation: A NIO reactor runs in its own thread when it is started using {@link #start()} method.
|
||||
* {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing.
|
||||
*
|
||||
* <p>
|
||||
* NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible edge cases which are
|
||||
* required in a real application. This implementation is meant to demonstrate the fundamental concepts that lie behind
|
||||
* Reactor pattern.
|
||||
* This class acts as Synchronous Event De-multiplexer and Initiation Dispatcher of Reactor pattern.
|
||||
* Multiple handles i.e. {@link AbstractNioChannel}s can be registered to the reactor and it blocks
|
||||
* for events from all these handles. Whenever an event occurs on any of the registered handles, it
|
||||
* synchronously de-multiplexes the event which can be any of read, write or accept, and dispatches
|
||||
* the event to the appropriate {@link ChannelHandler} using the {@link Dispatcher}.
|
||||
*
|
||||
* <p>Implementation: A NIO reactor runs in its own thread when it is started using {@link
|
||||
* #start()} method. {@link NioReactor} uses {@link Selector} for realizing Synchronous Event
|
||||
* De-multiplexing.
|
||||
*
|
||||
* <p>NOTE: This is one of the ways to implement NIO reactor and it does not take care of all
|
||||
* possible edge cases which are required in a real application. This implementation is meant to
|
||||
* demonstrate the fundamental concepts that lie behind Reactor pattern.
|
||||
*/
|
||||
public class NioReactor {
|
||||
|
||||
@ -62,21 +60,20 @@ public class NioReactor {
|
||||
private final Selector selector;
|
||||
private final Dispatcher dispatcher;
|
||||
/**
|
||||
* All the work of altering the SelectionKey operations and Selector operations are performed in the context of main
|
||||
* event loop of reactor. So when any channel needs to change its readability or writability, a new command is added
|
||||
* in the command queue and then the event loop picks up the command and executes it in next iteration.
|
||||
* All the work of altering the SelectionKey operations and Selector operations are performed in
|
||||
* the context of main event loop of reactor. So when any channel needs to change its readability
|
||||
* or writability, a new command is added in the command queue and then the event loop picks up
|
||||
* the command and executes it in next iteration.
|
||||
*/
|
||||
private final Queue<Runnable> pendingCommands = new ConcurrentLinkedQueue<>();
|
||||
private final ExecutorService reactorMain = Executors.newSingleThreadExecutor();
|
||||
|
||||
/**
|
||||
* Creates a reactor which will use provided {@code dispatcher} to dispatch events. The application can provide
|
||||
* various implementations of dispatcher which suits its needs.
|
||||
*
|
||||
* @param dispatcher
|
||||
* a non-null dispatcher used to dispatch events on registered channels.
|
||||
* @throws IOException
|
||||
* if any I/O error occurs.
|
||||
* Creates a reactor which will use provided {@code dispatcher} to dispatch events. The
|
||||
* application can provide various implementations of dispatcher which suits its needs.
|
||||
*
|
||||
* @param dispatcher a non-null dispatcher used to dispatch events on registered channels.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
public NioReactor(Dispatcher dispatcher) throws IOException {
|
||||
this.dispatcher = dispatcher;
|
||||
@ -99,11 +96,9 @@ public class NioReactor {
|
||||
|
||||
/**
|
||||
* Stops the reactor and related resources such as dispatcher.
|
||||
*
|
||||
* @throws InterruptedException
|
||||
* if interrupted while stopping the reactor.
|
||||
* @throws IOException
|
||||
* if any I/O error occurs.
|
||||
*
|
||||
* @throws InterruptedException if interrupted while stopping the reactor.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
public void stop() throws InterruptedException, IOException {
|
||||
reactorMain.shutdownNow();
|
||||
@ -114,15 +109,14 @@ public class NioReactor {
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a new channel (handle) with this reactor. Reactor will start waiting for events on this channel and
|
||||
* notify of any events. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()}
|
||||
* to know about the interested operation of this channel.
|
||||
*
|
||||
* @param channel
|
||||
* a new channel on which reactor will wait for events. The channel must be bound prior to being registered.
|
||||
* Registers a new channel (handle) with this reactor. Reactor will start waiting for events on
|
||||
* this channel and notify of any events. While registering the channel the reactor uses {@link
|
||||
* AbstractNioChannel#getInterestedOps()} to know about the interested operation of this channel.
|
||||
*
|
||||
* @param channel a new channel on which reactor will wait for events. The channel must be bound
|
||||
* prior to being registered.
|
||||
* @return this
|
||||
* @throws IOException
|
||||
* if any I/O error occurs.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
|
||||
SelectionKey key = channel.getJavaChannel().register(selector, channel.getInterestedOps());
|
||||
@ -143,8 +137,8 @@ public class NioReactor {
|
||||
processPendingCommands();
|
||||
|
||||
/*
|
||||
* Synchronous event de-multiplexing happens here, this is blocking call which returns when it is possible to
|
||||
* initiate non-blocking operation on any of the registered channels.
|
||||
* Synchronous event de-multiplexing happens here, this is blocking call which returns when it
|
||||
* is possible to initiate non-blocking operation on any of the registered channels.
|
||||
*/
|
||||
selector.select();
|
||||
|
||||
@ -177,8 +171,8 @@ public class NioReactor {
|
||||
}
|
||||
|
||||
/*
|
||||
* Initiation dispatcher logic, it checks the type of event and notifier application specific event handler to handle
|
||||
* the event.
|
||||
* Initiation dispatcher logic, it checks the type of event and notifier application specific
|
||||
* event handler to handle the event.
|
||||
*/
|
||||
private void processKey(SelectionKey key) throws IOException {
|
||||
if (key.isAcceptable()) {
|
||||
@ -226,15 +220,14 @@ public class NioReactor {
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues the change of operations request of a channel, which will change the interested operations of the channel
|
||||
* sometime in future.
|
||||
* <p>
|
||||
* This is a non-blocking method and does not guarantee that the operations have changed when this method returns.
|
||||
*
|
||||
* @param key
|
||||
* the key for which operations have to be changed.
|
||||
* @param interestedOps
|
||||
* the new interest operations.
|
||||
* Queues the change of operations request of a channel, which will change the interested
|
||||
* operations of the channel sometime in future.
|
||||
*
|
||||
* <p>This is a non-blocking method and does not guarantee that the operations have changed when
|
||||
* this method returns.
|
||||
*
|
||||
* @param key the key for which operations have to be changed.
|
||||
* @param interestedOps the new interest operations.
|
||||
*/
|
||||
public void changeOps(SelectionKey key, int interestedOps) {
|
||||
pendingCommands.add(new ChangeKeyOpsCommand(key, interestedOps));
|
||||
|
@ -23,9 +23,6 @@
|
||||
|
||||
package com.iluwatar.reactor.framework;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
@ -33,10 +30,12 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A wrapper over {@link NioServerSocketChannel} which can read and write data on a
|
||||
* {@link SocketChannel}.
|
||||
* A wrapper over {@link NioServerSocketChannel} which can read and write data on a {@link
|
||||
* SocketChannel}.
|
||||
*/
|
||||
public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
|
||||
@ -47,11 +46,11 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
/**
|
||||
* Creates a {@link ServerSocketChannel} which will bind at provided port and use
|
||||
* <code>handler</code> to handle incoming events on this channel.
|
||||
* <p>
|
||||
* Note the constructor does not bind the socket, {@link #bind()} method should be called for
|
||||
*
|
||||
* <p>Note the constructor does not bind the socket, {@link #bind()} method should be called for
|
||||
* binding the socket.
|
||||
*
|
||||
* @param port the port on which channel will be bound to accept incoming connection requests.
|
||||
*
|
||||
* @param port the port on which channel will be bound to accept incoming connection requests.
|
||||
* @param handler the handler that will handle incoming requests on this channel.
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
@ -68,6 +67,8 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get server socket channel.
|
||||
*
|
||||
* @return the underlying {@link ServerSocketChannel}.
|
||||
*/
|
||||
@Override
|
||||
@ -94,7 +95,7 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
|
||||
/**
|
||||
* Binds TCP socket on the provided <code>port</code>.
|
||||
*
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
@Override
|
||||
|
@ -29,18 +29,16 @@ import java.nio.channels.SelectionKey;
|
||||
* Dispatches the events in the context of caller thread. This implementation is a good fit for
|
||||
* small applications where there are limited clients. Using this implementation limits the
|
||||
* scalability because the I/O thread performs the application specific processing.
|
||||
*
|
||||
* <p>
|
||||
* For better performance use {@link ThreadPoolDispatcher}.
|
||||
*
|
||||
*
|
||||
* <p>For better performance use {@link ThreadPoolDispatcher}.
|
||||
*
|
||||
* @see ThreadPoolDispatcher
|
||||
*/
|
||||
public class SameThreadDispatcher implements Dispatcher {
|
||||
|
||||
/**
|
||||
* Dispatches the read event in the context of caller thread. <br>
|
||||
* Note this is a blocking call. It returns only after the associated handler has handled the read
|
||||
* event.
|
||||
* Dispatches the read event in the context of caller thread. <br> Note this is a blocking call.
|
||||
* It returns only after the associated handler has handled the read event.
|
||||
*/
|
||||
@Override
|
||||
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
|
||||
|
@ -39,7 +39,7 @@ public class ThreadPoolDispatcher implements Dispatcher {
|
||||
|
||||
/**
|
||||
* Creates a pooled dispatcher with tunable pool size.
|
||||
*
|
||||
*
|
||||
* @param poolSize number of pooled threads
|
||||
*/
|
||||
public ThreadPoolDispatcher(int poolSize) {
|
||||
@ -48,9 +48,8 @@ public class ThreadPoolDispatcher implements Dispatcher {
|
||||
|
||||
/**
|
||||
* Submits the work of dispatching the read event to worker pool, where it gets picked up by
|
||||
* worker threads. <br>
|
||||
* Note that this is a non-blocking call and returns immediately. It is not guaranteed that the
|
||||
* event has been handled by associated handler.
|
||||
* worker threads. <br> Note that this is a non-blocking call and returns immediately. It is not
|
||||
* guaranteed that the event has been handled by associated handler.
|
||||
*/
|
||||
@Override
|
||||
public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
|
||||
@ -59,7 +58,7 @@ public class ThreadPoolDispatcher implements Dispatcher {
|
||||
|
||||
/**
|
||||
* Stops the pool of workers.
|
||||
*
|
||||
*
|
||||
* @throws InterruptedException if interrupted while stopping pool of workers.
|
||||
*/
|
||||
@Override
|
||||
|
Reference in New Issue
Block a user