Work on #74, improved documentation and minor changes
This commit is contained in:
@ -10,8 +10,8 @@ import com.iluwatar.reactor.framework.NioServerSocketChannel;
|
|||||||
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
|
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This application demonstrates Reactor pattern. It represents a Distributed Logging Service
|
* This application demonstrates Reactor pattern. The example demonstrated is a Distributed Logging Service
|
||||||
* where it can listen on multiple TCP or UDP sockets for incoming log requests.
|
* where it listens on multiple TCP or UDP sockets for incoming log requests.
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* <i>INTENT</i>
|
* <i>INTENT</i>
|
||||||
@ -49,13 +49,10 @@ public class App {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* App entry.
|
* App entry.
|
||||||
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) throws IOException {
|
||||||
try {
|
new App().start();
|
||||||
new App().start();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -70,12 +67,12 @@ public class App {
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* This represents application specific business logic that dispatcher will call
|
* This represents application specific business logic that dispatcher will call
|
||||||
* on appropriate events. These events are read and write event in our example.
|
* on appropriate events. These events are read events in our example.
|
||||||
*/
|
*/
|
||||||
LoggingHandler loggingHandler = new LoggingHandler();
|
LoggingHandler loggingHandler = new LoggingHandler();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Our application binds to multiple I/O channels and uses same logging handler to handle
|
* Our application binds to multiple channels and uses same logging handler to handle
|
||||||
* incoming log requests.
|
* incoming log requests.
|
||||||
*/
|
*/
|
||||||
reactor
|
reactor
|
||||||
|
@ -9,24 +9,43 @@ import java.net.DatagramSocket;
|
|||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
import java.net.SocketException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents the clients of Reactor pattern. Multiple clients are run concurrently and send logging
|
||||||
|
* requests to Reactor.
|
||||||
|
*
|
||||||
|
* @author npathai
|
||||||
|
*/
|
||||||
public class AppClient {
|
public class AppClient {
|
||||||
private ExecutorService service = Executors.newFixedThreadPool(3);
|
private ExecutorService service = Executors.newFixedThreadPool(4);
|
||||||
|
|
||||||
public static void main(String[] args) {
|
/**
|
||||||
new AppClient().start();
|
* App client entry.
|
||||||
|
* @throws IOException if any I/O error occurs.
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
AppClient appClient = new AppClient();
|
||||||
|
appClient.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
/**
|
||||||
service.execute(new LoggingClient("Client 1", 6666));
|
* Starts the logging clients.
|
||||||
service.execute(new LoggingClient("Client 2", 6667));
|
* @throws IOException if any I/O error occurs.
|
||||||
service.execute(new UDPLoggingClient(6668));
|
*/
|
||||||
|
public void start() throws IOException {
|
||||||
|
service.execute(new TCPLoggingClient("Client 1", 6666));
|
||||||
|
service.execute(new TCPLoggingClient("Client 2", 6667));
|
||||||
|
service.execute(new UDPLoggingClient("Client 3", 6668));
|
||||||
|
service.execute(new UDPLoggingClient("Client 4", 6668));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stops logging clients. This is a blocking call.
|
||||||
|
*/
|
||||||
public void stop() {
|
public void stop() {
|
||||||
service.shutdown();
|
service.shutdown();
|
||||||
if (!service.isTerminated()) {
|
if (!service.isTerminated()) {
|
||||||
@ -39,49 +58,49 @@ public class AppClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
private static void artificialDelayOf(long millis) {
|
||||||
* A logging client that sends logging requests to logging server
|
try {
|
||||||
|
Thread.sleep(millis);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A logging client that sends requests to Reactor on TCP socket.
|
||||||
*/
|
*/
|
||||||
static class LoggingClient implements Runnable {
|
static class TCPLoggingClient implements Runnable {
|
||||||
|
|
||||||
private int serverPort;
|
private int serverPort;
|
||||||
private String clientName;
|
private String clientName;
|
||||||
|
|
||||||
public LoggingClient(String clientName, int serverPort) {
|
/**
|
||||||
|
* Creates a new TCP logging client.
|
||||||
|
*
|
||||||
|
* @param clientName the name of the client to be sent in logging requests.
|
||||||
|
* @param port the port on which client will send logging requests.
|
||||||
|
*/
|
||||||
|
public TCPLoggingClient(String clientName, int serverPort) {
|
||||||
this.clientName = clientName;
|
this.clientName = clientName;
|
||||||
this.serverPort = serverPort;
|
this.serverPort = serverPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
Socket socket = null;
|
try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) {
|
||||||
try {
|
|
||||||
socket = new Socket(InetAddress.getLocalHost(), serverPort);
|
|
||||||
OutputStream outputStream = socket.getOutputStream();
|
OutputStream outputStream = socket.getOutputStream();
|
||||||
PrintWriter writer = new PrintWriter(outputStream);
|
PrintWriter writer = new PrintWriter(outputStream);
|
||||||
writeLogs(writer, socket.getInputStream());
|
sendLogRequests(writer, socket.getInputStream());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
} finally {
|
|
||||||
if (socket != null) {
|
|
||||||
try {
|
|
||||||
socket.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException {
|
private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException {
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
writer.println(clientName + " - Log request: " + i);
|
writer.println(clientName + " - Log request: " + i);
|
||||||
try {
|
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
writer.flush();
|
writer.flush();
|
||||||
|
|
||||||
byte[] data = new byte[1024];
|
byte[] data = new byte[1024];
|
||||||
int read = inputStream.read(data, 0, data.length);
|
int read = inputStream.read(data, 0, data.length);
|
||||||
if (read == 0) {
|
if (read == 0) {
|
||||||
@ -89,46 +108,56 @@ public class AppClient {
|
|||||||
} else {
|
} else {
|
||||||
System.out.println(new String(data, 0, read));
|
System.out.println(new String(data, 0, read));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
artificialDelayOf(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
static class UDPLoggingClient implements Runnable {
|
|
||||||
private int port;
|
|
||||||
|
|
||||||
public UDPLoggingClient(int port) {
|
}
|
||||||
this.port = port;
|
|
||||||
|
/**
|
||||||
|
* A logging client that sends requests to Reactor on UDP socket.
|
||||||
|
*/
|
||||||
|
static class UDPLoggingClient implements Runnable {
|
||||||
|
private String clientName;
|
||||||
|
private InetSocketAddress remoteAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new UDP logging client.
|
||||||
|
*
|
||||||
|
* @param clientName the name of the client to be sent in logging requests.
|
||||||
|
* @param port the port on which client will send logging requests.
|
||||||
|
* @throws UnknownHostException if localhost is unknown
|
||||||
|
*/
|
||||||
|
public UDPLoggingClient(String clientName, int port) throws UnknownHostException {
|
||||||
|
this.clientName = clientName;
|
||||||
|
this.remoteAddress = new InetSocketAddress(InetAddress.getLocalHost(), port);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
DatagramSocket socket = null;
|
try (DatagramSocket socket = new DatagramSocket()) {
|
||||||
try {
|
|
||||||
socket = new DatagramSocket();
|
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
String message = "UDP Client" + " - Log request: " + i;
|
|
||||||
try {
|
String message = clientName + " - Log request: " + i;
|
||||||
DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port));
|
DatagramPacket request = new DatagramPacket(message.getBytes(),
|
||||||
socket.send(packet);
|
message.getBytes().length, remoteAddress);
|
||||||
|
|
||||||
byte[] data = new byte[1024];
|
socket.send(request);
|
||||||
DatagramPacket reply = new DatagramPacket(data, data.length);
|
|
||||||
socket.receive(reply);
|
byte[] data = new byte[1024];
|
||||||
if (reply.getLength() == 0) {
|
DatagramPacket reply = new DatagramPacket(data, data.length);
|
||||||
System.out.println("Read zero bytes");
|
socket.receive(reply);
|
||||||
} else {
|
if (reply.getLength() == 0) {
|
||||||
System.out.println(new String(reply.getData(), 0, reply.getLength()));
|
System.out.println("Read zero bytes");
|
||||||
}
|
} else {
|
||||||
} catch (IOException e) {
|
System.out.println(new String(reply.getData(), 0, reply.getLength()));
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
artificialDelayOf(100);
|
||||||
}
|
}
|
||||||
} catch (SocketException e1) {
|
} catch (IOException e1) {
|
||||||
e1.printStackTrace();
|
e1.printStackTrace();
|
||||||
} finally {
|
|
||||||
if (socket != null) {
|
|
||||||
socket.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ import com.iluwatar.reactor.framework.NioDatagramChannel.DatagramPacket;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Logging server application logic. It logs the incoming requests on standard console and returns
|
* Logging server application logic. It logs the incoming requests on standard console and returns
|
||||||
* a canned acknowledgement back to the remote peer.
|
* a canned acknowledgement back to the remote peer.
|
||||||
*
|
*
|
||||||
* @author npathai
|
* @author npathai
|
||||||
*/
|
*/
|
||||||
@ -23,17 +23,15 @@ public class LoggingHandler implements ChannelHandler {
|
|||||||
@Override
|
@Override
|
||||||
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
|
public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
|
||||||
/*
|
/*
|
||||||
* As this channel is attached to both TCP and UDP channels we need to check whether
|
* As this handler is attached with both TCP and UDP channels we need to check whether
|
||||||
* the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel).
|
* the data received is a ByteBuffer (from TCP channel) or a DatagramPacket (from UDP channel).
|
||||||
*/
|
*/
|
||||||
if (readObject instanceof ByteBuffer) {
|
if (readObject instanceof ByteBuffer) {
|
||||||
byte[] data = ((ByteBuffer)readObject).array();
|
doLogging(((ByteBuffer)readObject));
|
||||||
doLogging(data);
|
sendReply(channel, key);
|
||||||
sendReply(channel, data, key);
|
|
||||||
} else if (readObject instanceof DatagramPacket) {
|
} else if (readObject instanceof DatagramPacket) {
|
||||||
DatagramPacket datagram = (DatagramPacket)readObject;
|
DatagramPacket datagram = (DatagramPacket)readObject;
|
||||||
byte[] data = datagram.getData().array();
|
doLogging(datagram.getData());
|
||||||
doLogging(data);
|
|
||||||
sendReply(channel, datagram, key);
|
sendReply(channel, datagram, key);
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalStateException("Unknown data received");
|
throw new IllegalStateException("Unknown data received");
|
||||||
@ -50,13 +48,13 @@ public class LoggingHandler implements ChannelHandler {
|
|||||||
channel.write(replyPacket, key);
|
channel.write(replyPacket, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
|
private void sendReply(AbstractNioChannel channel, SelectionKey key) {
|
||||||
ByteBuffer buffer = ByteBuffer.wrap(ACK);
|
ByteBuffer buffer = ByteBuffer.wrap(ACK);
|
||||||
channel.write(buffer, key);
|
channel.write(buffer, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doLogging(byte[] data) {
|
private void doLogging(ByteBuffer data) {
|
||||||
// assuming UTF-8 :(
|
// assuming UTF-8 :(
|
||||||
System.out.println(new String(data));
|
System.out.println(new String(data.array(), 0, data.limit()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ public abstract class AbstractNioChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The operation in which the channel is interested, this operation is be provided to {@link Selector}.
|
* The operation in which the channel is interested, this operation is provided to {@link Selector}.
|
||||||
*
|
*
|
||||||
* @return interested operation.
|
* @return interested operation.
|
||||||
* @see SelectionKey
|
* @see SelectionKey
|
||||||
@ -63,15 +63,17 @@ public abstract class AbstractNioChannel {
|
|||||||
public abstract int getInterestedOps();
|
public abstract int getInterestedOps();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Requests the channel to bind.
|
* Binds the channel on provided port.
|
||||||
*
|
*
|
||||||
* @throws IOException if any I/O error occurs.
|
* @throws IOException if any I/O error occurs.
|
||||||
*/
|
*/
|
||||||
public abstract void bind() throws IOException;
|
public abstract void bind() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads the data using the key and returns the read data.
|
* Reads the data using the key and returns the read data. The underlying channel should be fetched using
|
||||||
* @param key the key which is readable.
|
* {@link SelectionKey#channel()}.
|
||||||
|
*
|
||||||
|
* @param key the key on which read event occurred.
|
||||||
* @return data read.
|
* @return data read.
|
||||||
* @throws IOException if any I/O error occurs.
|
* @throws IOException if any I/O error occurs.
|
||||||
*/
|
*/
|
||||||
@ -106,7 +108,7 @@ public abstract class AbstractNioChannel {
|
|||||||
/**
|
/**
|
||||||
* Writes the data to the channel.
|
* Writes the data to the channel.
|
||||||
*
|
*
|
||||||
* @param pendingWrite data which was queued for writing in batch mode.
|
* @param pendingWrite the data to be written on channel.
|
||||||
* @param key the key which is writable.
|
* @param key the key which is writable.
|
||||||
* @throws IOException if any I/O error occurs.
|
* @throws IOException if any I/O error occurs.
|
||||||
*/
|
*/
|
||||||
|
@ -7,7 +7,7 @@ import java.nio.channels.SelectionKey;
|
|||||||
* to it by the {@link Dispatcher}. This is where the application logic resides.
|
* to it by the {@link Dispatcher}. This is where the application logic resides.
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* A {@link ChannelHandler} is associated with one or many {@link AbstractNioChannel}s, and whenever
|
* A {@link ChannelHandler} can be associated with one or many {@link AbstractNioChannel}s, and whenever
|
||||||
* an event occurs on any of the associated channels, the handler is notified of the event.
|
* an event occurs on any of the associated channels, the handler is notified of the event.
|
||||||
*
|
*
|
||||||
* @author npathai
|
* @author npathai
|
||||||
@ -15,11 +15,11 @@ import java.nio.channels.SelectionKey;
|
|||||||
public interface ChannelHandler {
|
public interface ChannelHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the {@code channel} has received some data from remote peer.
|
* Called when the {@code channel} receives some data from remote peer.
|
||||||
*
|
*
|
||||||
* @param channel the channel from which the data is received.
|
* @param channel the channel from which the data was received.
|
||||||
* @param readObject the data read.
|
* @param readObject the data read.
|
||||||
* @param key the key from which the data is received.
|
* @param key the key on which read event occurred.
|
||||||
*/
|
*/
|
||||||
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
|
void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
|
||||||
}
|
}
|
||||||
|
@ -5,7 +5,7 @@ import java.nio.channels.SelectionKey;
|
|||||||
/**
|
/**
|
||||||
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
|
* Represents the event dispatching strategy. When {@link NioReactor} senses any event on the
|
||||||
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write
|
* registered {@link AbstractNioChannel}s then it de-multiplexes the event type, read or write
|
||||||
* or connect, and then calls the {@link Dispatcher} to dispatch the event. This decouples the I/O
|
* or connect, and then calls the {@link Dispatcher} to dispatch the read events. This decouples the I/O
|
||||||
* processing from application specific processing.
|
* processing from application specific processing.
|
||||||
* <br/>
|
* <br/>
|
||||||
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred.
|
* Dispatcher should call the {@link ChannelHandler} associated with the channel on which event occurred.
|
||||||
@ -24,6 +24,9 @@ public interface Dispatcher {
|
|||||||
* This hook method is called when read event occurs on particular channel. The data read
|
* This hook method is called when read event occurs on particular channel. The data read
|
||||||
* is provided in <code>readObject</code>. The implementation should dispatch this read event
|
* is provided in <code>readObject</code>. The implementation should dispatch this read event
|
||||||
* to the associated {@link ChannelHandler} of <code>channel</code>.
|
* to the associated {@link ChannelHandler} of <code>channel</code>.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* The type of <code>readObject</code> depends on the channel on which data was received.
|
||||||
*
|
*
|
||||||
* @param channel on which read event occurred
|
* @param channel on which read event occurred
|
||||||
* @param readObject object read by channel
|
* @param readObject object read by channel
|
||||||
@ -32,7 +35,7 @@ public interface Dispatcher {
|
|||||||
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
|
void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stops the dispatching events and cleans up any acquired resources such as threads.
|
* Stops dispatching events and cleans up any acquired resources such as threads.
|
||||||
*/
|
*/
|
||||||
void stop();
|
void stop();
|
||||||
}
|
}
|
||||||
|
@ -48,12 +48,13 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
|||||||
@Override
|
@Override
|
||||||
public DatagramPacket read(SelectionKey key) throws IOException {
|
public DatagramPacket read(SelectionKey key) throws IOException {
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
SocketAddress sender = getChannel().receive(buffer);
|
SocketAddress sender = ((DatagramChannel)key.channel()).receive(buffer);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* It is required to create a DatagramPacket because we need to preserve which
|
* It is required to create a DatagramPacket because we need to preserve which
|
||||||
* socket address acts as destination for sending reply packets.
|
* socket address acts as destination for sending reply packets.
|
||||||
*/
|
*/
|
||||||
|
buffer.flip();
|
||||||
DatagramPacket packet = new DatagramPacket(buffer);
|
DatagramPacket packet = new DatagramPacket(buffer);
|
||||||
packet.setSender(sender);
|
packet.setSender(sender);
|
||||||
|
|
||||||
@ -91,7 +92,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
|
* Writes the outgoing {@link DatagramPacket} to the channel. The intended receiver of the
|
||||||
* datagram packet must be set in the <code>data</code> using {@link DatagramPacket#setReceiver(SocketAddress)}.
|
* datagram packet must be set in the <code>data</code> using {@link DatagramPacket#setReceiver(SocketAddress)}.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -22,11 +22,11 @@ import java.util.concurrent.TimeUnit;
|
|||||||
* <p>
|
* <p>
|
||||||
* Implementation:
|
* Implementation:
|
||||||
* A NIO reactor runs in its own thread when it is started using {@link #start()} method.
|
* A NIO reactor runs in its own thread when it is started using {@link #start()} method.
|
||||||
* {@link NioReactor} uses {@link Selector} as a mechanism for achieving Synchronous Event De-multiplexing.
|
* {@link NioReactor} uses {@link Selector} for realizing Synchronous Event De-multiplexing.
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* NOTE: This is one of the way to implement NIO reactor and it does not take care of all possible edge cases
|
* NOTE: This is one of the ways to implement NIO reactor and it does not take care of all possible edge cases
|
||||||
* which may be required in a real application. This implementation is meant to demonstrate the fundamental
|
* which are required in a real application. This implementation is meant to demonstrate the fundamental
|
||||||
* concepts that lie behind Reactor pattern.
|
* concepts that lie behind Reactor pattern.
|
||||||
*
|
*
|
||||||
* @author npathai
|
* @author npathai
|
||||||
@ -64,16 +64,13 @@ public class NioReactor {
|
|||||||
* @throws IOException if any I/O error occurs.
|
* @throws IOException if any I/O error occurs.
|
||||||
*/
|
*/
|
||||||
public void start() throws IOException {
|
public void start() throws IOException {
|
||||||
reactorMain.execute(new Runnable() {
|
reactorMain.execute(() -> {
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
try {
|
||||||
System.out.println("Reactor started, waiting for events...");
|
System.out.println("Reactor started, waiting for events...");
|
||||||
eventLoop();
|
eventLoop();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,11 +89,11 @@ public class NioReactor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers a new channel (handle) with this reactor after which the reactor will wait for events
|
* Registers a new channel (handle) with this reactor. Reactor will start waiting for events on this channel
|
||||||
* on this channel. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()}
|
* and notify of any events. While registering the channel the reactor uses {@link AbstractNioChannel#getInterestedOps()}
|
||||||
* to know about the interested operation of this channel.
|
* to know about the interested operation of this channel.
|
||||||
*
|
*
|
||||||
* @param channel a new handle on which reactor will wait for events. The channel must be bound
|
* @param channel a new channel on which reactor will wait for events. The channel must be bound
|
||||||
* prior to being registered.
|
* prior to being registered.
|
||||||
* @return this
|
* @return this
|
||||||
* @throws IOException if any I/O error occurs.
|
* @throws IOException if any I/O error occurs.
|
||||||
@ -111,7 +108,7 @@ public class NioReactor {
|
|||||||
private void eventLoop() throws IOException {
|
private void eventLoop() throws IOException {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
// Honor interrupt request
|
// honor interrupt request
|
||||||
if (Thread.interrupted()) {
|
if (Thread.interrupted()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -189,7 +186,7 @@ public class NioReactor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Uses the application provided dispatcher to dispatch events to respective handlers.
|
* Uses the application provided dispatcher to dispatch events to application handler.
|
||||||
*/
|
*/
|
||||||
private void dispatchReadEvent(SelectionKey key, Object readObject) {
|
private void dispatchReadEvent(SelectionKey key, Object readObject) {
|
||||||
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key);
|
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key);
|
||||||
@ -207,10 +204,10 @@ public class NioReactor {
|
|||||||
* Queues the change of operations request of a channel, which will change the interested
|
* Queues the change of operations request of a channel, which will change the interested
|
||||||
* operations of the channel sometime in future.
|
* operations of the channel sometime in future.
|
||||||
* <p>
|
* <p>
|
||||||
* This is a non-blocking method and does not guarantee that the operations are changed when
|
* This is a non-blocking method and does not guarantee that the operations have changed when
|
||||||
* this method returns.
|
* this method returns.
|
||||||
*
|
*
|
||||||
* @param key the key for which operations are to be changed.
|
* @param key the key for which operations have to be changed.
|
||||||
* @param interestedOps the new interest operations.
|
* @param interestedOps the new interest operations.
|
||||||
*/
|
*/
|
||||||
public void changeOps(SelectionKey key, int interestedOps) {
|
public void changeOps(SelectionKey key, int interestedOps) {
|
||||||
|
@ -24,8 +24,8 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
|||||||
* Note the constructor does not bind the socket, {@link #bind()} method should be called for binding
|
* Note the constructor does not bind the socket, {@link #bind()} method should be called for binding
|
||||||
* the socket.
|
* the socket.
|
||||||
*
|
*
|
||||||
* @param port the port to be bound to listen for incoming requests.
|
* @param port the port on which channel will be bound to accept incoming connection requests.
|
||||||
* @param handler the handler to be used for handling incoming requests on this channel.
|
* @param handler the handler that will handle incoming requests on this channel.
|
||||||
* @throws IOException if any I/O error occurs.
|
* @throws IOException if any I/O error occurs.
|
||||||
*/
|
*/
|
||||||
public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException {
|
public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException {
|
||||||
@ -36,7 +36,7 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getInterestedOps() {
|
public int getInterestedOps() {
|
||||||
// being a server socket channel it is interested in accepting connection from remote clients.
|
// being a server socket channel it is interested in accepting connection from remote peers.
|
||||||
return SelectionKey.OP_ACCEPT;
|
return SelectionKey.OP_ACCEPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,6 +58,7 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
|||||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
int read = socketChannel.read(buffer);
|
int read = socketChannel.read(buffer);
|
||||||
|
buffer.flip();
|
||||||
if (read == -1) {
|
if (read == -1) {
|
||||||
throw new IOException("Socket closed");
|
throw new IOException("Socket closed");
|
||||||
}
|
}
|
||||||
@ -83,7 +84,6 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
|||||||
@Override
|
@Override
|
||||||
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
|
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
|
||||||
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;
|
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;
|
||||||
System.out.println("Writing on channel");
|
|
||||||
((SocketChannel)key.channel()).write(pendingBuffer);
|
((SocketChannel)key.channel()).write(pendingBuffer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -8,7 +8,7 @@ import java.nio.channels.SelectionKey;
|
|||||||
* because the I/O thread performs the application specific processing.
|
* because the I/O thread performs the application specific processing.
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
* For real applications use {@link ThreadPoolDispatcher}.
|
* For better performance use {@link ThreadPoolDispatcher}.
|
||||||
*
|
*
|
||||||
* @see ThreadPoolDispatcher
|
* @see ThreadPoolDispatcher
|
||||||
*
|
*
|
||||||
|
@ -7,8 +7,8 @@ import java.util.concurrent.TimeUnit;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation that uses a pool of worker threads to dispatch the events. This provides
|
* An implementation that uses a pool of worker threads to dispatch the events. This provides
|
||||||
* for better scalability as the application specific processing is not performed in the context
|
* better scalability as the application specific processing is not performed in the context
|
||||||
* of I/O thread.
|
* of I/O (reactor) thread.
|
||||||
*
|
*
|
||||||
* @author npathai
|
* @author npathai
|
||||||
*
|
*
|
||||||
@ -46,7 +46,7 @@ public class ThreadPoolDispatcher extends SameThreadDispatcher {
|
|||||||
public void stop() {
|
public void stop() {
|
||||||
executorService.shutdownNow();
|
executorService.shutdownNow();
|
||||||
try {
|
try {
|
||||||
executorService.awaitTermination(1000, TimeUnit.SECONDS);
|
executorService.awaitTermination(4, TimeUnit.SECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user