Work on #74, enhanced reactor to allow multiple channels

This commit is contained in:
Narendra Pathai
2015-08-23 18:51:24 +05:30
parent 99adb5b0cf
commit d3f2ea22ac
3 changed files with 84 additions and 35 deletions

View File

@ -1,7 +1,11 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import com.iluwatar.reactor.NioReactor.NioChannelEventHandler; import com.iluwatar.reactor.NioReactor.NioChannelEventHandler;
@ -10,12 +14,35 @@ public class App {
public static void main(String[] args) { public static void main(String[] args) {
try { try {
new NioReactor(6666, new LoggingServer()).start(); NioReactor reactor = new NioReactor();
reactor
.registerChannel(tcpChannel(6666))
.registerChannel(tcpChannel(6667))
.start();
reactor.registerHandler(new LoggingServer());
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
private static SelectableChannel udpChannel(int port) throws IOException {
DatagramChannel channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
System.out.println("Bound UDP socket at port: " + port);
return channel;
}
private static SelectableChannel tcpChannel(int port) throws IOException {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(port));
channel.configureBlocking(false);
System.out.println("Bound TCP socket at port: " + port);
return channel;
}
static class LoggingServer implements NioChannelEventHandler { static class LoggingServer implements NioChannelEventHandler {
@Override @Override

View File

@ -9,14 +9,15 @@ import java.net.Socket;
public class AppClient { public class AppClient {
public static void main(String[] args) { public static void main(String[] args) {
new LoggingClient("Client 1", 6666).start(); new Thread(new LoggingClient("Client 1", 6666)).start();
new Thread(new LoggingClient("Client 2", 6667)).start();
} }
/* /*
* A logging client that sends logging requests to logging server * A logging client that sends logging requests to logging server
*/ */
static class LoggingClient { static class LoggingClient implements Runnable {
private int serverPort; private int serverPort;
private String clientName; private String clientName;
@ -26,7 +27,7 @@ public class AppClient {
this.serverPort = serverPort; this.serverPort = serverPort;
} }
public void start() { public void run() {
Socket socket = null; Socket socket = null;
try { try {
socket = new Socket(InetAddress.getLocalHost(), serverPort); socket = new Socket(InetAddress.getLocalHost(), serverPort);
@ -51,7 +52,7 @@ public class AppClient {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
writer.println(clientName + " - Log request: " + i); writer.println(clientName + " - Log request: " + i);
try { try {
Thread.sleep(1000); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -1,44 +1,63 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
/*
* Abstractions
* ---------------
*
* 1 - Dispatcher
* 2 - Synchronous Event De-multiplexer
* 3 - Event
* 4 - Event Handler & concrete event handler (application business logic)
* 5 - Selector
*/
public class NioReactor { public class NioReactor {
private int port;
private Selector selector; private Selector selector;
private ServerSocketChannel serverSocketChannel; private Acceptor acceptor;
private NioChannelEventHandler nioEventhandler; private List<NioChannelEventHandler> eventHandlers = new CopyOnWriteArrayList<>();
public NioReactor(int port, NioChannelEventHandler handler) { public NioReactor() throws IOException {
this.port = port; this.acceptor = new Acceptor();
this.nioEventhandler = handler; this.selector = Selector.open();
} }
public NioReactor registerChannel(SelectableChannel channel) throws IOException {
SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT);
key.attach(acceptor);
return this;
}
public void registerHandler(NioChannelEventHandler handler) {
eventHandlers.add(handler);
}
public void start() throws IOException { public void start() throws IOException {
startReactor(); new Thread( new Runnable() {
requestLoop(); @Override
public void run() {
try {
System.out.println("Reactor started, waiting for events...");
eventLoop();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
} }
private void startReactor() throws IOException { private void eventLoop() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey acceptorKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
acceptorKey.attach(new Acceptor());
System.out.println("Reactor started listening on port: " + port);
}
private void requestLoop() throws IOException {
while (true) { while (true) {
selector.select(); selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys(); Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) { for (SelectionKey key : keys) {
dispatchEvent(key); dispatchEvent(key);
@ -50,21 +69,21 @@ public class NioReactor {
private void dispatchEvent(SelectionKey key) throws IOException { private void dispatchEvent(SelectionKey key) throws IOException {
Object handler = key.attachment(); Object handler = key.attachment();
if (handler != null) { if (handler != null) {
((EventHandler)handler).handle(); ((EventHandler)handler).handle(key.channel());
} }
} }
interface EventHandler { interface EventHandler {
void handle() throws IOException; void handle(SelectableChannel channel) throws IOException;
} }
private class Acceptor implements EventHandler { private class Acceptor implements EventHandler {
public void handle() throws IOException { public void handle(SelectableChannel channel) throws IOException {
// non-blocking accept as acceptor will only be called when accept event is available // non-blocking accept as acceptor will only be called when accept event is available
SocketChannel clientChannel = serverSocketChannel.accept(); SocketChannel clientChannel = ((ServerSocketChannel)channel).accept();
if (clientChannel != null) { if (clientChannel != null) {
new ChannelHandler(clientChannel).handle(); new ChannelHandler(clientChannel).handle(clientChannel);
} }
System.out.println("Connection established with a client"); System.out.println("Connection established with a client");
} }
@ -88,9 +107,11 @@ public class NioReactor {
selector.wakeup(); selector.wakeup();
} }
public void handle() throws IOException { public void handle(SelectableChannel channel) throws IOException {
// only read events are supported. // only read events are supported.
nioEventhandler.onReadable(clientChannel); for (NioChannelEventHandler eventHandler : eventHandlers) {
eventHandler.onReadable(clientChannel);
}
} }
} }
} }