Work on #74, server mode works with both UDP and TCP channels

This commit is contained in:
Narendra Pathai 2015-09-02 12:28:52 +05:30
parent ec8203a196
commit b94c1d37d2
10 changed files with 83 additions and 43 deletions

View File

@ -1,7 +1,6 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.Map; import java.util.Map;
@ -13,7 +12,7 @@ public abstract class AbstractNioChannel {
private SelectableChannel channel; private SelectableChannel channel;
private ChannelHandler handler; private ChannelHandler handler;
private Map<SelectableChannel, Queue<ByteBuffer>> channelToPendingWrites = new ConcurrentHashMap<>(); private Map<SelectableChannel, Queue<Object>> channelToPendingWrites = new ConcurrentHashMap<>();
private NioReactor reactor; private NioReactor reactor;
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) { public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
@ -31,7 +30,7 @@ public abstract class AbstractNioChannel {
public abstract int getInterestedOps(); public abstract int getInterestedOps();
public abstract ByteBuffer read(SelectionKey key) throws IOException; public abstract Object read(SelectionKey key) throws IOException;
public void setHandler(ChannelHandler handler) { public void setHandler(ChannelHandler handler) {
this.handler = handler; this.handler = handler;
@ -43,9 +42,9 @@ public abstract class AbstractNioChannel {
// Called from the context of reactor thread // Called from the context of reactor thread
public void write(SelectionKey key) throws IOException { public void write(SelectionKey key) throws IOException {
Queue<ByteBuffer> pendingWrites = channelToPendingWrites.get(key.channel()); Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
while (true) { while (true) {
ByteBuffer pendingWrite = pendingWrites.poll(); Object pendingWrite = pendingWrites.poll();
if (pendingWrite == null) { if (pendingWrite == null) {
System.out.println("No more pending writes"); System.out.println("No more pending writes");
reactor.changeOps(key, SelectionKey.OP_READ); reactor.changeOps(key, SelectionKey.OP_READ);
@ -56,10 +55,10 @@ public abstract class AbstractNioChannel {
} }
} }
protected abstract void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException; protected abstract void doWrite(Object pendingWrite, SelectionKey key) throws IOException;
public void write(ByteBuffer buffer, SelectionKey key) { public void write(Object data, SelectionKey key) {
Queue<ByteBuffer> pendingWrites = this.channelToPendingWrites.get(key.channel()); Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
if (pendingWrites == null) { if (pendingWrites == null) {
synchronized (this.channelToPendingWrites) { synchronized (this.channelToPendingWrites) {
pendingWrites = this.channelToPendingWrites.get(key.channel()); pendingWrites = this.channelToPendingWrites.get(key.channel());
@ -69,7 +68,7 @@ public abstract class AbstractNioChannel {
} }
} }
} }
pendingWrites.add(buffer); pendingWrites.add(data);
reactor.changeOps(key, SelectionKey.OP_WRITE); reactor.changeOps(key, SelectionKey.OP_WRITE);
} }
} }

View File

@ -14,8 +14,8 @@ import java.net.SocketException;
public class AppClient { public class AppClient {
public static void main(String[] args) { public static void main(String[] args) {
// new Thread(new LoggingClient("Client 1", 6666)).start(); new Thread(new LoggingClient("Client 1", 6666)).start();
// new Thread(new LoggingClient("Client 2", 6667)).start(); new Thread(new LoggingClient("Client 2", 6667)).start();
new Thread(new UDPLoggingClient(6668)).start(); new Thread(new UDPLoggingClient(6668)).start();
} }

View File

@ -1,9 +1,8 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
public interface ChannelHandler { public interface ChannelHandler {
void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key); void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key);
} }

View File

@ -1,8 +1,7 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
public interface Dispatcher { public interface Dispatcher {
void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key); void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key);
} }

View File

@ -3,16 +3,31 @@ package com.iluwatar.reactor;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import com.iluwatar.reactor.NioDatagramChannel.DatagramPacket;
public class LoggingHandler implements ChannelHandler { public class LoggingHandler implements ChannelHandler {
@Override @Override
public void handleChannelRead(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { public void handleChannelRead(AbstractNioChannel channel, Object readObject, SelectionKey key) {
byte[] data = readBytes.array(); if (readObject instanceof ByteBuffer) {
doLogging(data); byte[] data = ((ByteBuffer)readObject).array();
sendEchoReply(channel, data, key); doLogging(data);
sendReply(channel, data, key);
} else if (readObject instanceof DatagramPacket) {
DatagramPacket datagram = (DatagramPacket)readObject;
byte[] data = datagram.getData().array();
doLogging(data);
sendReply(channel, datagram, key);
}
} }
private void sendEchoReply(AbstractNioChannel channel, byte[] data, SelectionKey key) { private void sendReply(AbstractNioChannel channel, DatagramPacket datagram, SelectionKey key) {
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap("Data logged successfully".getBytes()));
replyPacket.setReceiver(datagram.getSender());
channel.write(replyPacket, key);
}
private void sendReply(AbstractNioChannel channel, byte[] data, SelectionKey key) {
ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes()); ByteBuffer buffer = ByteBuffer.wrap("Data logged successfully".getBytes());
channel.write(buffer, key); channel.write(buffer, key);
} }

View File

@ -3,6 +3,7 @@ package com.iluwatar.reactor;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel; import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
@ -22,10 +23,12 @@ public class NioDatagramChannel extends AbstractNioChannel {
} }
@Override @Override
public ByteBuffer read(SelectionKey key) throws IOException { public Object read(SelectionKey key) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024); ByteBuffer buffer = ByteBuffer.allocate(1024);
getChannel().receive(buffer); SocketAddress sender = getChannel().receive(buffer);
return buffer; DatagramPacket packet = new DatagramPacket(buffer);
packet.setSender(sender);
return packet;
} }
@Override @Override
@ -40,8 +43,38 @@ public class NioDatagramChannel extends AbstractNioChannel {
} }
@Override @Override
protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException { protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
pendingWrite.flip(); DatagramPacket pendingPacket = (DatagramPacket) pendingWrite;
getChannel().write(pendingWrite); getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
}
static class DatagramPacket {
private SocketAddress sender;
private ByteBuffer data;
private SocketAddress receiver;
public DatagramPacket(ByteBuffer data) {
this.data = data;
}
public SocketAddress getSender() {
return sender;
}
public void setSender(SocketAddress sender) {
this.sender = sender;
}
public SocketAddress getReceiver() {
return receiver;
}
public void setReceiver(SocketAddress receiver) {
this.receiver = receiver;
}
public ByteBuffer getData() {
return data;
}
} }
} }

View File

@ -1,7 +1,6 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -75,7 +74,6 @@ public class NioReactor {
Iterator<Command> iterator = pendingChanges.iterator(); Iterator<Command> iterator = pendingChanges.iterator();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Command command = iterator.next(); Command command = iterator.next();
System.out.println("Processing pending change: " + command);
command.execute(); command.execute();
iterator.remove(); iterator.remove();
} }
@ -85,10 +83,8 @@ public class NioReactor {
if (key.isAcceptable()) { if (key.isAcceptable()) {
acceptConnection(key); acceptConnection(key);
} else if (key.isReadable()) { } else if (key.isReadable()) {
System.out.println("Key is readable");
read(key); read(key);
} else if (key.isWritable()) { } else if (key.isWritable()) {
System.out.println("Key is writable");
write(key); write(key);
} }
} }
@ -99,10 +95,10 @@ public class NioReactor {
} }
private void read(SelectionKey key) { private void read(SelectionKey key) {
ByteBuffer readBytes; Object readObject;
try { try {
readBytes = ((AbstractNioChannel)key.attachment()).read(key); readObject = ((AbstractNioChannel)key.attachment()).read(key);
dispatchReadEvent(key, readBytes); dispatchReadEvent(key, readObject);
} catch (IOException e) { } catch (IOException e) {
try { try {
key.channel().close(); key.channel().close();
@ -112,8 +108,8 @@ public class NioReactor {
} }
} }
private void dispatchReadEvent(SelectionKey key, ByteBuffer readBytes) { private void dispatchReadEvent(SelectionKey key, Object readObject) {
dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readBytes, key); dispatcher.onChannelReadEvent((AbstractNioChannel)key.attachment(), readObject, key);
} }
private void acceptConnection(SelectionKey key) throws IOException { private void acceptConnection(SelectionKey key) throws IOException {

View File

@ -45,8 +45,9 @@ public class NioServerSocketChannel extends AbstractNioChannel {
} }
@Override @Override
protected void doWrite(ByteBuffer pendingWrite, SelectionKey key) throws IOException { protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;
System.out.println("Writing on channel"); System.out.println("Writing on channel");
((SocketChannel)key.channel()).write(pendingWrite); ((SocketChannel)key.channel()).write(pendingBuffer);
} }
} }

View File

@ -1,14 +1,13 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
public class SameThreadDispatcher implements Dispatcher { public class SameThreadDispatcher implements Dispatcher {
@Override @Override
public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
if (channel.getHandler() != null) { if (channel.getHandler() != null) {
channel.getHandler().handleChannelRead(channel, readBytes, key); channel.getHandler().handleChannelRead(channel, readObject, key);
} }
} }
} }

View File

@ -1,6 +1,5 @@
package com.iluwatar.reactor; package com.iluwatar.reactor;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -14,12 +13,12 @@ public class ThreadPoolDispatcher extends SameThreadDispatcher {
} }
@Override @Override
public void onChannelReadEvent(AbstractNioChannel channel, ByteBuffer readBytes, SelectionKey key) { public void onChannelReadEvent(AbstractNioChannel channel, Object readObject, SelectionKey key) {
exectorService.execute(new Runnable() { exectorService.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
ThreadPoolDispatcher.super.onChannelReadEvent(channel, readBytes, key); ThreadPoolDispatcher.super.onChannelReadEvent(channel, readObject, key);
} }
}); });
} }