diff --git a/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java index 9f6040ade..f55cea073 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/AbstractNioChannel.java @@ -1,7 +1,6 @@ package com.iluwatar.reactor; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.util.Map; @@ -13,7 +12,7 @@ public abstract class AbstractNioChannel { private SelectableChannel channel; private ChannelHandler handler; - private Map> channelToPendingWrites = new ConcurrentHashMap<>(); + private Map> channelToPendingWrites = new ConcurrentHashMap<>(); private NioReactor reactor; public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { @@ -31,7 +30,7 @@ public abstract class AbstractNioChannel { public abstract int getInterestedOps(); - public abstract ByteBuffer read(SelectionKey key) throws IOException; + public abstract Object read(SelectionKey key) throws IOException; public void setHandler(ChannelHandler handler) { this.handler = handler; @@ -43,9 +42,9 @@ public abstract class AbstractNioChannel { // Called from the context of reactor thread public void write(SelectionKey key) throws IOException { - Queue pendingWrites = channelToPendingWrites.get(key.channel()); + Queue pendingWrites = channelToPendingWrites.get(key.channel()); while (true) { - ByteBuffer pendingWrite = pendingWrites.poll(); + Object pendingWrite = pendingWrites.poll(); if (pendingWrite == null) { System.out.println("No more pending writes"); reactor.changeOps(key, SelectionKey.OP_READ); @@ -56,10 +55,10 @@ public abstract class AbstractNioChannel { } } - protected abstract void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException; + protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException; - public void write(ByteBuffer buffer, SelectionKey key) { - Queue pendingWrites = this.channelToPendingWrites.get(key.channel()); + public void write(Object data, SelectionKey key) { + Queue pendingWrites = this.channelToPendingWrites.get(key.channel()); if (pendingWrites == null) { synchronized (this.channelToPendingWrites) { pendingWrites = this.channelToPendingWrites.get(key.channel()); @@ -69,7 +68,7 @@ public abstract class AbstractNioChannel { } } } - pendingWrites.add(buffer); + pendingWrites.add(data); reactor.changeOps(key, SelectionKey.OP_WRITE); } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java index 3d7323a55..188b64ea8 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/AppClient.java +++ b/reactor/src/main/java/com/iluwatar/reactor/AppClient.java @@ -14,8 +14,8 @@ import java.net.SocketException; public class AppClient { public static void main(String[] args) { -// new Thread(new LoggingClient("Client 1", 6666)).start(); -// new Thread(new LoggingClient("Client 2", 6667)).start(); + new Thread(new LoggingClient("Client 1", 6666)).start(); + new Thread(new LoggingClient("Client 2", 6667)).start(); new Thread(new UDPLoggingClient(6668)).start(); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java b/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java index 055e8edd6..e84c506f9 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java +++ b/reactor/src/main/java/com/iluwatar/reactor/ChannelHandler.java @@ -1,9 +1,8 @@ package com.iluwatar.reactor; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; public interface ChannelHandler { - void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key); + void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java index 1bc14c55f..15fe7774c 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/Dispatcher.java @@ -1,8 +1,7 @@ package com.iluwatar.reactor; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; public interface Dispatcher { - void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key); + void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java b/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java index 3744c3d5a..fc7efaeed 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java +++ b/reactor/src/main/java/com/iluwatar/reactor/LoggingHandler.java @@ -3,16 +3,31 @@ package com.iluwatar.reactor; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; +import com.iluwatar.reactor.NioDatagramChannel.DatagramPacket; + public class LoggingHandler implements ChannelHandler { @Override - public void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { - byte[] data = readBytes.array(); - doLogging(data); - sendEchoReply(channel, data, key); + public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) { + if (readObject instanceof ByteBuffer) { + byte[] data = ((ByteBuffer)readObject).array(); + doLogging(data); + sendReply(channel, data, key); + } else if (readObject instanceof DatagramPacket) { + DatagramPacket datagram = (DatagramPacket)readObject; + byte[] data = datagram.getData().array(); + doLogging(data); + sendReply(channel, datagram, key); + } } - private void sendEchoReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { + private void sendReply(AbstractNioChannel channel, DatagramPacket datagram, SelectionKey key) { + DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap("Data logged successfully".getBytes())); + replyPacket.setReceiver(datagram.getSender()); + channel.write(replyPacket, key); + } + + private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes()); channel.write(buffer, key); } diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java index 2f655f192..4d1690792 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/NioDatagramChannel.java @@ -3,6 +3,7 @@ package com.iluwatar.reactor; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.nio.channels.SelectionKey; @@ -22,10 +23,12 @@ public class NioDatagramChannel extends AbstractNioChannel { } @Override - public ByteBuffer read(SelectionKey key) throws IOException { + public Object read(SelectionKey key) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(1024); - getChannel().receive(buffer); - return buffer; + SocketAddress sender = getChannel().receive(buffer); + DatagramPacket packet = new DatagramPacket(buffer); + packet.setSender(sender); + return packet; } @Override @@ -40,8 +43,38 @@ public class NioDatagramChannel extends AbstractNioChannel { } @Override - protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException { - pendingWrite.flip(); - getChannel().write(pendingWrite); + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; + getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); + } + + static class DatagramPacket { + private SocketAddress sender; + private ByteBuffer data; + private SocketAddress receiver; + + public DatagramPacket(ByteBuffer data) { + this.data = data; + } + + public SocketAddress getSender() { + return sender; + } + + public void setSender(SocketAddress sender) { + this.sender = sender; + } + + public SocketAddress getReceiver() { + return receiver; + } + + public void setReceiver(SocketAddress receiver) { + this.receiver = receiver; + } + + public ByteBuffer getData() { + return data; + } } } \ No newline at end of file diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java index 05aa609d1..f10ea4b82 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java +++ b/reactor/src/main/java/com/iluwatar/reactor/NioReactor.java @@ -1,7 +1,6 @@ package com.iluwatar.reactor; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -75,7 +74,6 @@ public class NioReactor { Iterator iterator = pendingChanges.iterator(); while (iterator.hasNext()) { Command command = iterator.next(); - System.out.println("Processing pending change: " + command); command.execute(); iterator.remove(); } @@ -85,10 +83,8 @@ public class NioReactor { if (key.isAcceptable()) { acceptConnection(key); } else if (key.isReadable()) { - System.out.println("Key is readable"); read(key); } else if (key.isWritable()) { - System.out.println("Key is writable"); write(key); } } @@ -99,10 +95,10 @@ public class NioReactor { } private void read(SelectionKey key) { - ByteBuffer readBytes; + Object readObject; try { - readBytes = ((AbstractNioChannel)key.attachment()).read(key); - dispatchReadEvent(key, readBytes); + readObject = ((AbstractNioChannel)key.attachment()).read(key); + dispatchReadEvent(key, readObject); } catch (IOException e) { try { key.channel().close(); @@ -112,8 +108,8 @@ public class NioReactor { } } - private void dispatchReadEvent(SelectionKey key, ByteBuffer readBytes) { - dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readBytes, key); + private void dispatchReadEvent(SelectionKey key, Object readObject) { + dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key); } private void acceptConnection(SelectionKey key) throws IOException { diff --git a/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java index 66affdb8d..ebd8f0ef3 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/NioServerSocketChannel.java @@ -45,8 +45,9 @@ public class NioServerSocketChannel extends AbstractNioChannel { } @Override - protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException { + protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { + ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite; System.out.println("Writing on channel"); - ((SocketChannel)key.channel()).write(pendingWrite); + ((SocketChannel)key.channel()).write(pendingBuffer); } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java index 9b8029de4..024441b7c 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/SameThreadDispatcher.java @@ -1,14 +1,13 @@ package com.iluwatar.reactor; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; public class SameThreadDispatcher implements Dispatcher { @Override - public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { if (channel.getHandler() != null) { - channel.getHandler().handleChannelRead(channel, readBytes, key); + channel.getHandler().handleChannelRead(channel, readObject, key); } } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java index 2f44e4372..e9e4ac34c 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java +++ b/reactor/src/main/java/com/iluwatar/reactor/ThreadPoolDispatcher.java @@ -1,6 +1,5 @@ package com.iluwatar.reactor; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -14,12 +13,12 @@ public class ThreadPoolDispatcher extends SameThreadDispatcher { } @Override - public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { + public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) { exectorService.execute(new Runnable() { @Override public void run() { - ThreadPoolDispatcher.super.onChannelReadEvent(channel, readBytes, key); + ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key); } }); }