Ongoing work on #74 introduced better abstractions in reactor - tcp and udp mode
This commit is contained in:
		@@ -0,0 +1,75 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectableChannel;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					import java.util.Map;
 | 
				
			||||||
 | 
					import java.util.Queue;
 | 
				
			||||||
 | 
					import java.util.concurrent.ConcurrentHashMap;
 | 
				
			||||||
 | 
					import java.util.concurrent.ConcurrentLinkedQueue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public abstract class AbstractNioChannel {
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						private SelectableChannel channel;
 | 
				
			||||||
 | 
						private ChannelHandler handler;
 | 
				
			||||||
 | 
						private Map<SelectableChannel, Queue<ByteBuffer>> channelToPendingWrites = new ConcurrentHashMap<>();
 | 
				
			||||||
 | 
						private NioReactor reactor;
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
 | 
				
			||||||
 | 
							this.handler = handler;
 | 
				
			||||||
 | 
							this.channel = channel;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public void setReactor(NioReactor reactor) {
 | 
				
			||||||
 | 
							this.reactor = reactor;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public SelectableChannel getChannel() {
 | 
				
			||||||
 | 
							return channel;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public abstract int getInterestedOps();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public abstract ByteBuffer read(SelectionKey key) throws IOException;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public void setHandler(ChannelHandler handler) {
 | 
				
			||||||
 | 
							this.handler = handler;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public ChannelHandler getHandler() {
 | 
				
			||||||
 | 
							return handler;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Called from the context of reactor thread
 | 
				
			||||||
 | 
						public void write(SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							Queue<ByteBuffer> pendingWrites = channelToPendingWrites.get(key.channel());
 | 
				
			||||||
 | 
							while (true) {
 | 
				
			||||||
 | 
								ByteBuffer pendingWrite = pendingWrites.poll();
 | 
				
			||||||
 | 
								if (pendingWrite == null) {
 | 
				
			||||||
 | 
									System.out.println("No more pending writes");
 | 
				
			||||||
 | 
									reactor.changeOps(key, SelectionKey.OP_READ);
 | 
				
			||||||
 | 
									break;
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								
 | 
				
			||||||
 | 
								doWrite(pendingWrite, key);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						protected abstract void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public void write(ByteBuffer buffer, SelectionKey key) {
 | 
				
			||||||
 | 
							Queue<ByteBuffer> pendingWrites = this.channelToPendingWrites.get(key.channel());
 | 
				
			||||||
 | 
							if (pendingWrites == null) {
 | 
				
			||||||
 | 
								synchronized (this.channelToPendingWrites) {
 | 
				
			||||||
 | 
									pendingWrites = this.channelToPendingWrites.get(key.channel());
 | 
				
			||||||
 | 
									if (pendingWrites == null) {
 | 
				
			||||||
 | 
										pendingWrites = new ConcurrentLinkedQueue<>();
 | 
				
			||||||
 | 
										this.channelToPendingWrites.put(key.channel(), pendingWrites);
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							pendingWrites.add(buffer);
 | 
				
			||||||
 | 
							reactor.changeOps(key, SelectionKey.OP_WRITE);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -1,69 +1,32 @@
 | 
				
			|||||||
package com.iluwatar.reactor;
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.io.IOException;
 | 
					import java.io.IOException;
 | 
				
			||||||
import java.net.InetSocketAddress;
 | 
					 | 
				
			||||||
import java.nio.ByteBuffer;
 | 
					 | 
				
			||||||
import java.nio.channels.DatagramChannel;
 | 
					 | 
				
			||||||
import java.nio.channels.SelectableChannel;
 | 
					 | 
				
			||||||
import java.nio.channels.ServerSocketChannel;
 | 
					 | 
				
			||||||
import java.nio.channels.SocketChannel;
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
import com.iluwatar.reactor.NioReactor.NioChannelEventHandler;
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
public class App {
 | 
					public class App {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	public static void main(String[] args) {
 | 
						public static void main(String[] args) {
 | 
				
			||||||
		try {
 | 
							try {
 | 
				
			||||||
			NioReactor reactor = new NioReactor();
 | 
								NioReactor reactor = new NioReactor(new ThreadPoolDispatcher(2));
 | 
				
			||||||
 | 
								LoggingHandler loggingHandler = new LoggingHandler();
 | 
				
			||||||
			reactor
 | 
								reactor
 | 
				
			||||||
				.registerChannel(tcpChannel(6666))
 | 
									.registerChannel(tcpChannel(6666, loggingHandler))
 | 
				
			||||||
				.registerChannel(tcpChannel(6667))
 | 
									.registerChannel(tcpChannel(6667, loggingHandler))
 | 
				
			||||||
 | 
									.registerChannel(udpChannel(6668, loggingHandler))
 | 
				
			||||||
			.start();
 | 
								.start();
 | 
				
			||||||
 | 
					 | 
				
			||||||
			reactor.registerHandler(new LoggingServer());
 | 
					 | 
				
			||||||
		} catch (IOException e) {
 | 
							} catch (IOException e) {
 | 
				
			||||||
			e.printStackTrace();
 | 
								e.printStackTrace();
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private static SelectableChannel udpChannel(int port) throws IOException {
 | 
						private static AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
 | 
				
			||||||
		DatagramChannel channel = DatagramChannel.open();
 | 
							NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
 | 
				
			||||||
		channel.socket().bind(new InetSocketAddress(port));
 | 
							channel.bind();
 | 
				
			||||||
		channel.configureBlocking(false);
 | 
					 | 
				
			||||||
		System.out.println("Bound UDP socket at port: " + port);
 | 
					 | 
				
			||||||
		return channel;
 | 
							return channel;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	private static SelectableChannel tcpChannel(int port) throws IOException {
 | 
						private static AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException {
 | 
				
			||||||
		ServerSocketChannel channel = ServerSocketChannel.open();
 | 
							NioDatagramChannel channel = new NioDatagramChannel(port, handler);
 | 
				
			||||||
		channel.socket().bind(new InetSocketAddress(port));
 | 
							channel.bind();
 | 
				
			||||||
		channel.configureBlocking(false);
 | 
					 | 
				
			||||||
		System.out.println("Bound TCP socket at port: " + port);
 | 
					 | 
				
			||||||
		return channel;
 | 
							return channel;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	static class LoggingServer implements NioChannelEventHandler {
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		@Override
 | 
					 | 
				
			||||||
		public void onReadable(SocketChannel channel) {
 | 
					 | 
				
			||||||
			ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
 | 
					 | 
				
			||||||
			try {
 | 
					 | 
				
			||||||
				int byteCount = channel.read(requestBuffer);
 | 
					 | 
				
			||||||
				if (byteCount > 0) {
 | 
					 | 
				
			||||||
					byte[] logRequestContents = new byte[byteCount];
 | 
					 | 
				
			||||||
					byte[] array = requestBuffer.array();
 | 
					 | 
				
			||||||
					System.arraycopy(array, 0, logRequestContents, 0, byteCount);
 | 
					 | 
				
			||||||
					doLogging(new String(logRequestContents));
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			} catch (IOException e) {
 | 
					 | 
				
			||||||
				e.printStackTrace();
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		private void doLogging(String log) {
 | 
					 | 
				
			||||||
			// do logging at server side
 | 
					 | 
				
			||||||
			System.out.println(log);
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,16 +1,22 @@
 | 
				
			|||||||
package com.iluwatar.reactor;
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.io.IOException;
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					import java.io.InputStream;
 | 
				
			||||||
import java.io.OutputStream;
 | 
					import java.io.OutputStream;
 | 
				
			||||||
import java.io.PrintWriter;
 | 
					import java.io.PrintWriter;
 | 
				
			||||||
 | 
					import java.net.DatagramPacket;
 | 
				
			||||||
 | 
					import java.net.DatagramSocket;
 | 
				
			||||||
import java.net.InetAddress;
 | 
					import java.net.InetAddress;
 | 
				
			||||||
 | 
					import java.net.InetSocketAddress;
 | 
				
			||||||
import java.net.Socket;
 | 
					import java.net.Socket;
 | 
				
			||||||
 | 
					import java.net.SocketException;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
public class AppClient {
 | 
					public class AppClient {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	public static void main(String[] args) {
 | 
						public static void main(String[] args) {
 | 
				
			||||||
		new Thread(new LoggingClient("Client 1", 6666)).start();
 | 
					//		new Thread(new LoggingClient("Client 1", 6666)).start();
 | 
				
			||||||
		new Thread(new LoggingClient("Client 2", 6667)).start();
 | 
					//		new Thread(new LoggingClient("Client 2", 6667)).start();
 | 
				
			||||||
 | 
							new Thread(new UDPLoggingClient(6668)).start();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
@@ -33,7 +39,7 @@ public class AppClient {
 | 
				
			|||||||
				socket = new Socket(InetAddress.getLocalHost(), serverPort);
 | 
									socket = new Socket(InetAddress.getLocalHost(), serverPort);
 | 
				
			||||||
				OutputStream outputStream = socket.getOutputStream();
 | 
									OutputStream outputStream = socket.getOutputStream();
 | 
				
			||||||
				PrintWriter writer = new PrintWriter(outputStream);
 | 
									PrintWriter writer = new PrintWriter(outputStream);
 | 
				
			||||||
				writeLogs(writer);
 | 
									writeLogs(writer, socket.getInputStream());
 | 
				
			||||||
			} catch (IOException e) {
 | 
								} catch (IOException e) {
 | 
				
			||||||
				e.printStackTrace();
 | 
									e.printStackTrace();
 | 
				
			||||||
				throw new RuntimeException(e);
 | 
									throw new RuntimeException(e);
 | 
				
			||||||
@@ -48,8 +54,8 @@ public class AppClient {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		private void writeLogs(PrintWriter writer) {
 | 
							private void writeLogs(PrintWriter writer, InputStream inputStream) throws IOException {
 | 
				
			||||||
			for (int i = 0; i < 10; i++) {
 | 
								for (int i = 0; i < 1; i++) {
 | 
				
			||||||
				writer.println(clientName + " - Log request: " + i);
 | 
									writer.println(clientName + " - Log request: " + i);
 | 
				
			||||||
				try {
 | 
									try {
 | 
				
			||||||
					Thread.sleep(100);
 | 
										Thread.sleep(100);
 | 
				
			||||||
@@ -57,6 +63,53 @@ public class AppClient {
 | 
				
			|||||||
					e.printStackTrace();
 | 
										e.printStackTrace();
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				writer.flush();
 | 
									writer.flush();
 | 
				
			||||||
 | 
									byte[] data = new byte[1024];
 | 
				
			||||||
 | 
									int read = inputStream.read(data, 0, data.length);
 | 
				
			||||||
 | 
									if (read == 0) {
 | 
				
			||||||
 | 
										System.out.println("Read zero bytes");
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										System.out.println(new String(data, 0, read));
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						static class UDPLoggingClient implements Runnable {
 | 
				
			||||||
 | 
							private int port;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							public UDPLoggingClient(int port) {
 | 
				
			||||||
 | 
								this.port = port;
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							
 | 
				
			||||||
 | 
							@Override
 | 
				
			||||||
 | 
							public void run() {
 | 
				
			||||||
 | 
								DatagramSocket socket = null;
 | 
				
			||||||
 | 
								try {
 | 
				
			||||||
 | 
									socket = new DatagramSocket();
 | 
				
			||||||
 | 
									for (int i = 0; i < 1; i++) {
 | 
				
			||||||
 | 
										String message = "UDP Client" + " - Log request: " + i;
 | 
				
			||||||
 | 
										try {
 | 
				
			||||||
 | 
											DatagramPacket packet = new DatagramPacket(message.getBytes(), message.getBytes().length, new InetSocketAddress(InetAddress.getLocalHost(), port));
 | 
				
			||||||
 | 
											socket.send(packet);
 | 
				
			||||||
 | 
											
 | 
				
			||||||
 | 
											byte[] data = new byte[1024];
 | 
				
			||||||
 | 
											DatagramPacket reply = new DatagramPacket(data, data.length);
 | 
				
			||||||
 | 
											socket.receive(reply);
 | 
				
			||||||
 | 
											if (reply.getLength() == 0) {
 | 
				
			||||||
 | 
												System.out.println("Read zero bytes");
 | 
				
			||||||
 | 
											} else {
 | 
				
			||||||
 | 
												System.out.println(new String(reply.getData(), 0, reply.getLength()));
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										} catch (IOException e) {
 | 
				
			||||||
 | 
											e.printStackTrace();
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								} catch (SocketException e1) {
 | 
				
			||||||
 | 
									e1.printStackTrace();
 | 
				
			||||||
 | 
								} finally {
 | 
				
			||||||
 | 
									if (socket != null) {
 | 
				
			||||||
 | 
										socket.close();
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -0,0 +1,9 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public interface ChannelHandler {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,8 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public interface Dispatcher {
 | 
				
			||||||
 | 
						void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key);
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,24 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class LoggingHandler implements ChannelHandler {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) {
 | 
				
			||||||
 | 
							byte[] data = readBytes.array();
 | 
				
			||||||
 | 
							doLogging(data);
 | 
				
			||||||
 | 
							sendEchoReply(channel, data, key);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private void sendEchoReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
 | 
				
			||||||
 | 
							ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes());
 | 
				
			||||||
 | 
							channel.write(buffer, key);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private void doLogging(byte[] data) {
 | 
				
			||||||
 | 
							// assuming UTF-8 :(
 | 
				
			||||||
 | 
							System.out.println(new String(data));
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,47 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					import java.net.InetAddress;
 | 
				
			||||||
 | 
					import java.net.InetSocketAddress;
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.DatagramChannel;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class NioDatagramChannel extends AbstractNioChannel {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private int port;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public NioDatagramChannel(int port, ChannelHandler handler) throws IOException {
 | 
				
			||||||
 | 
							super(handler, DatagramChannel.open());
 | 
				
			||||||
 | 
							this.port = port;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public int getInterestedOps() {
 | 
				
			||||||
 | 
							return SelectionKey.OP_READ;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public ByteBuffer read(SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							ByteBuffer buffer = ByteBuffer.allocate(1024);
 | 
				
			||||||
 | 
							getChannel().receive(buffer);
 | 
				
			||||||
 | 
							return buffer;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public DatagramChannel getChannel() {
 | 
				
			||||||
 | 
							return (DatagramChannel) super.getChannel();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						public void bind() throws IOException {
 | 
				
			||||||
 | 
							getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
 | 
				
			||||||
 | 
							getChannel().configureBlocking(false);
 | 
				
			||||||
 | 
							System.out.println("Bound UDP socket at port: " + port);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							pendingWrite.flip();
 | 
				
			||||||
 | 
							getChannel().write(pendingWrite);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -1,46 +1,39 @@
 | 
				
			|||||||
package com.iluwatar.reactor;
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import java.io.IOException;
 | 
					import java.io.IOException;
 | 
				
			||||||
import java.nio.channels.SelectableChannel;
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
import java.nio.channels.SelectionKey;
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
import java.nio.channels.Selector;
 | 
					import java.nio.channels.Selector;
 | 
				
			||||||
import java.nio.channels.ServerSocketChannel;
 | 
					import java.nio.channels.ServerSocketChannel;
 | 
				
			||||||
import java.nio.channels.SocketChannel;
 | 
					import java.nio.channels.SocketChannel;
 | 
				
			||||||
import java.util.List;
 | 
					import java.util.Iterator;
 | 
				
			||||||
 | 
					import java.util.Queue;
 | 
				
			||||||
import java.util.Set;
 | 
					import java.util.Set;
 | 
				
			||||||
import java.util.concurrent.CopyOnWriteArrayList;
 | 
					import java.util.concurrent.ConcurrentLinkedQueue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/*
 | 
					/*
 | 
				
			||||||
 * Abstractions
 | 
					 * Abstractions
 | 
				
			||||||
 * ---------------
 | 
					 * ---------------
 | 
				
			||||||
 * 
 | 
					 | 
				
			||||||
 * 1 - Dispatcher
 | 
					 | 
				
			||||||
 * 2 - Synchronous Event De-multiplexer
 | 
					 * 2 - Synchronous Event De-multiplexer
 | 
				
			||||||
 * 3 - Event
 | 
					 | 
				
			||||||
 * 4 - Event Handler & concrete event handler (application business logic)
 | 
					 | 
				
			||||||
 * 5 - Selector
 | 
					 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
public class NioReactor {
 | 
					public class NioReactor {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private Selector selector;
 | 
						private Selector selector;
 | 
				
			||||||
	private Acceptor acceptor;
 | 
						private Dispatcher dispatcher;
 | 
				
			||||||
	private List<NioChannelEventHandler> eventHandlers = new CopyOnWriteArrayList<>();
 | 
						private Queue<Command> pendingChanges = new ConcurrentLinkedQueue<>();
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public NioReactor() throws IOException {
 | 
						public NioReactor(Dispatcher dispatcher) throws IOException {
 | 
				
			||||||
		this.acceptor = new Acceptor();
 | 
							this.dispatcher = dispatcher;
 | 
				
			||||||
		this.selector = Selector.open();
 | 
							this.selector = Selector.open();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public NioReactor registerChannel(SelectableChannel channel) throws IOException {
 | 
						public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
 | 
				
			||||||
		SelectionKey key = channel.register(selector, SelectionKey.OP_ACCEPT);
 | 
							SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps());
 | 
				
			||||||
		key.attach(acceptor);
 | 
							key.attach(channel);
 | 
				
			||||||
 | 
							channel.setReactor(this);
 | 
				
			||||||
		return this;
 | 
							return this;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	public void registerHandler(NioChannelEventHandler handler) {
 | 
					 | 
				
			||||||
		eventHandlers.add(handler);
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
	
 | 
					 | 
				
			||||||
	public void start() throws IOException {
 | 
						public void start() throws IOException {
 | 
				
			||||||
		new Thread( new Runnable() {
 | 
							new Thread( new Runnable() {
 | 
				
			||||||
			@Override
 | 
								@Override
 | 
				
			||||||
@@ -52,66 +45,110 @@ public class NioReactor {
 | 
				
			|||||||
					e.printStackTrace();
 | 
										e.printStackTrace();
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}).start();
 | 
							}, "Reactor Main").start();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private void eventLoop() throws IOException {
 | 
						private void eventLoop() throws IOException {
 | 
				
			||||||
		while (true) {
 | 
							while (true) {
 | 
				
			||||||
			selector.select(1000);
 | 
								// honor any pending requests first
 | 
				
			||||||
 | 
								processPendingChanges();
 | 
				
			||||||
 | 
								
 | 
				
			||||||
 | 
								selector.select();
 | 
				
			||||||
 | 
								
 | 
				
			||||||
			Set<SelectionKey> keys = selector.selectedKeys();
 | 
								Set<SelectionKey> keys = selector.selectedKeys();
 | 
				
			||||||
			for (SelectionKey key : keys) {
 | 
					
 | 
				
			||||||
				dispatchEvent(key);
 | 
								Iterator<SelectionKey> iterator = keys.iterator();
 | 
				
			||||||
 | 
								
 | 
				
			||||||
 | 
								while (iterator.hasNext()) {
 | 
				
			||||||
 | 
									SelectionKey key = iterator.next();
 | 
				
			||||||
 | 
									if (!key.isValid()) {
 | 
				
			||||||
 | 
										iterator.remove();
 | 
				
			||||||
 | 
										continue;
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									processKey(key);
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			keys.clear();
 | 
								keys.clear();
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
	private void dispatchEvent(SelectionKey key) throws IOException {
 | 
						private void processPendingChanges() {
 | 
				
			||||||
		Object handler = key.attachment();
 | 
							Iterator<Command> iterator = pendingChanges.iterator();
 | 
				
			||||||
		if (handler != null) {
 | 
							while (iterator.hasNext()) {
 | 
				
			||||||
			((EventHandler)handler).handle(key.channel());
 | 
								Command command = iterator.next();
 | 
				
			||||||
 | 
								System.out.println("Processing pending change: " + command);
 | 
				
			||||||
 | 
								command.execute();
 | 
				
			||||||
 | 
								iterator.remove();
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	interface EventHandler {
 | 
						private void processKey(SelectionKey key) throws IOException {
 | 
				
			||||||
		void handle(SelectableChannel channel) throws IOException;
 | 
							if (key.isAcceptable()) {
 | 
				
			||||||
 | 
								acceptConnection(key);
 | 
				
			||||||
 | 
							} else if (key.isReadable()) {
 | 
				
			||||||
 | 
								System.out.println("Key is readable");
 | 
				
			||||||
 | 
								read(key);
 | 
				
			||||||
 | 
							} else if (key.isWritable()) {
 | 
				
			||||||
 | 
								System.out.println("Key is writable");
 | 
				
			||||||
 | 
								write(key);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private class Acceptor implements EventHandler {
 | 
						private void write(SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							AbstractNioChannel channel = (AbstractNioChannel) key.attachment();
 | 
				
			||||||
 | 
							channel.write(key);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		public void handle(SelectableChannel channel) throws IOException {
 | 
						private void read(SelectionKey key) {
 | 
				
			||||||
			// non-blocking accept as acceptor will only be called when accept event is available
 | 
							ByteBuffer readBytes;
 | 
				
			||||||
			SocketChannel clientChannel = ((ServerSocketChannel)channel).accept();
 | 
							try {
 | 
				
			||||||
			if (clientChannel != null) {
 | 
								readBytes = ((AbstractNioChannel)key.attachment()).read(key);
 | 
				
			||||||
				new ChannelHandler(clientChannel).handle(clientChannel);
 | 
								dispatchReadEvent(key, readBytes);
 | 
				
			||||||
 | 
							} catch (IOException e) {
 | 
				
			||||||
 | 
								try {
 | 
				
			||||||
 | 
									key.channel().close();
 | 
				
			||||||
 | 
								} catch (IOException e1) {
 | 
				
			||||||
 | 
									e1.printStackTrace();
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			System.out.println("Connection established with a client");
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	public static interface NioChannelEventHandler {
 | 
						private void dispatchReadEvent(SelectionKey key, ByteBuffer readBytes) {
 | 
				
			||||||
		void onReadable(SocketChannel channel);
 | 
							dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readBytes, key);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	private class ChannelHandler implements EventHandler {
 | 
						private void acceptConnection(SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
 | 
				
			||||||
 | 
							SocketChannel socketChannel = serverSocketChannel.accept();
 | 
				
			||||||
 | 
							socketChannel.configureBlocking(false);
 | 
				
			||||||
 | 
							SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
 | 
				
			||||||
 | 
							readKey.attach(key.attachment());
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		private SocketChannel clientChannel;
 | 
						interface Command {
 | 
				
			||||||
		private SelectionKey selectionKey;
 | 
							void execute();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
	
 | 
						
 | 
				
			||||||
		public ChannelHandler(SocketChannel clientChannel) throws IOException {
 | 
						public void changeOps(SelectionKey key, int interestedOps) {
 | 
				
			||||||
			this.clientChannel = clientChannel;
 | 
							pendingChanges.add(new ChangeKeyOpsCommand(key, interestedOps));
 | 
				
			||||||
			clientChannel.configureBlocking(false);
 | 
							selector.wakeup();
 | 
				
			||||||
			selectionKey = clientChannel.register(selector, 0);
 | 
						}
 | 
				
			||||||
			selectionKey.attach(this);
 | 
						
 | 
				
			||||||
			selectionKey.interestOps(SelectionKey.OP_READ);
 | 
						class ChangeKeyOpsCommand implements Command {
 | 
				
			||||||
			selector.wakeup();
 | 
							private SelectionKey key;
 | 
				
			||||||
 | 
							private int interestedOps;
 | 
				
			||||||
 | 
							
 | 
				
			||||||
 | 
							public ChangeKeyOpsCommand(SelectionKey key, int interestedOps) {
 | 
				
			||||||
 | 
								this.key = key;
 | 
				
			||||||
 | 
								this.interestedOps = interestedOps;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		
 | 
							
 | 
				
			||||||
		public void handle(SelectableChannel channel) throws IOException {
 | 
							public void execute() {
 | 
				
			||||||
			// only read events are supported.
 | 
								key.interestOps(interestedOps);
 | 
				
			||||||
			for (NioChannelEventHandler eventHandler : eventHandlers) {
 | 
							}
 | 
				
			||||||
				eventHandler.onReadable(clientChannel);
 | 
							
 | 
				
			||||||
			}
 | 
							@Override
 | 
				
			||||||
 | 
							public String toString() {
 | 
				
			||||||
 | 
								return "Change of ops to: " + interestedOps;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,52 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.io.IOException;
 | 
				
			||||||
 | 
					import java.net.InetAddress;
 | 
				
			||||||
 | 
					import java.net.InetSocketAddress;
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					import java.nio.channels.ServerSocketChannel;
 | 
				
			||||||
 | 
					import java.nio.channels.SocketChannel;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class NioServerSocketChannel extends AbstractNioChannel {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private int port;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public NioServerSocketChannel(int port, ChannelHandler handler) throws IOException {
 | 
				
			||||||
 | 
							super(handler, ServerSocketChannel.open());
 | 
				
			||||||
 | 
							this.port = port;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public int getInterestedOps() {
 | 
				
			||||||
 | 
							return SelectionKey.OP_ACCEPT;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public ServerSocketChannel getChannel() {
 | 
				
			||||||
 | 
							return (ServerSocketChannel) super.getChannel();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public ByteBuffer read(SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							SocketChannel socketChannel = (SocketChannel) key.channel();
 | 
				
			||||||
 | 
							ByteBuffer buffer = ByteBuffer.allocate(1024);
 | 
				
			||||||
 | 
							int read = socketChannel.read(buffer);
 | 
				
			||||||
 | 
							if (read == -1) {
 | 
				
			||||||
 | 
								throw new IOException("Socket closed");
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return buffer;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public void bind() throws IOException {
 | 
				
			||||||
 | 
							((ServerSocketChannel)getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
 | 
				
			||||||
 | 
							((ServerSocketChannel)getChannel()).configureBlocking(false);
 | 
				
			||||||
 | 
							System.out.println("Bound TCP socket at port: " + port);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException {
 | 
				
			||||||
 | 
							System.out.println("Writing on channel");
 | 
				
			||||||
 | 
							((SocketChannel)key.channel()).write(pendingWrite);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,14 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class SameThreadDispatcher implements Dispatcher {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) {
 | 
				
			||||||
 | 
							if (channel.getHandler() != null) {
 | 
				
			||||||
 | 
								channel.getHandler().handleChannelRead(channel, readBytes, key);
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -0,0 +1,27 @@
 | 
				
			|||||||
 | 
					package com.iluwatar.reactor;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import java.nio.ByteBuffer;
 | 
				
			||||||
 | 
					import java.nio.channels.SelectionKey;
 | 
				
			||||||
 | 
					import java.util.concurrent.ExecutorService;
 | 
				
			||||||
 | 
					import java.util.concurrent.Executors;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					public class ThreadPoolDispatcher extends SameThreadDispatcher {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						private ExecutorService exectorService;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						public ThreadPoolDispatcher(int poolSize) {
 | 
				
			||||||
 | 
							this.exectorService = Executors.newFixedThreadPool(poolSize);
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						@Override
 | 
				
			||||||
 | 
						public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) {
 | 
				
			||||||
 | 
							exectorService.execute(new Runnable() {
 | 
				
			||||||
 | 
								
 | 
				
			||||||
 | 
								@Override
 | 
				
			||||||
 | 
								public void run() {
 | 
				
			||||||
 | 
									ThreadPoolDispatcher.super.onChannelReadEvent(channel, readBytes, key);
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							});
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										4
									
								
								reactor/todo.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								reactor/todo.txt
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,4 @@
 | 
				
			|||||||
 | 
					* Make UDP channel work (connect is required)
 | 
				
			||||||
 | 
					* Cleanup
 | 
				
			||||||
 | 
					* Document - Javadoc
 | 
				
			||||||
 | 
					* Better design?? Get review of @iluwatar
 | 
				
			||||||
		Reference in New Issue
	
	Block a user