diff --git a/reactor/etc/reactor.png b/reactor/etc/reactor.png index 0b00ec98b..abe705682 100644 Binary files a/reactor/etc/reactor.png and b/reactor/etc/reactor.png differ diff --git a/reactor/etc/reactor.ucls b/reactor/etc/reactor.ucls index d072e4029..90e28cdd7 100644 --- a/reactor/etc/reactor.ucls +++ b/reactor/etc/reactor.ucls @@ -124,80 +124,108 @@ - - - - - - - - + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - - + + + + + + - - - - - + + + + + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/reactor/src/main/java/com/iluwatar/reactor/app/App.java b/reactor/src/main/java/com/iluwatar/reactor/app/App.java index 5c6d91ee8..7bb01ddc8 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/app/App.java +++ b/reactor/src/main/java/com/iluwatar/reactor/app/App.java @@ -66,6 +66,15 @@ public class App { private NioReactor reactor; private List channels = new ArrayList<>(); + private Dispatcher dispatcher; + + /** + * Creates an instance of App which will use provided dispatcher for dispatching events on reactor. + * @param dispatcher the dispatcher that will be used to dispatch events. + */ + public App(Dispatcher dispatcher) { + this.dispatcher = dispatcher; + } /** * App entry. @@ -73,16 +82,15 @@ public class App { * @throws IOException */ public static void main(String[] args) throws IOException { - new App().start(new ThreadPoolDispatcher(2)); + new App(new ThreadPoolDispatcher(2)).start(); } /** * Starts the NIO reactor. - * @param threadPoolDispatcher - * + * * @throws IOException if any channel fails to bind. */ - public void start(Dispatcher dispatcher) throws IOException { + public void start() throws IOException { /* * The application can customize its event dispatching mechanism. */ @@ -110,8 +118,9 @@ public class App { */ public void stop() throws InterruptedException, IOException { reactor.stop(); + dispatcher.stop(); for (AbstractNioChannel channel : channels) { - channel.getChannel().close(); + channel.getJavaChannel().close(); } } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java index 4abacd86f..9f2f8a95c 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/AbstractNioChannel.java @@ -48,7 +48,7 @@ public abstract class AbstractNioChannel { /** * @return the wrapped NIO channel. */ - public SelectableChannel getChannel() { + public SelectableChannel getJavaChannel() { return channel; } diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java index b55480ffc..a2ff3d3d8 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioDatagramChannel.java @@ -64,8 +64,8 @@ public class NioDatagramChannel extends AbstractNioChannel { * @return the underlying datagram channel. */ @Override - public DatagramChannel getChannel() { - return (DatagramChannel) super.getChannel(); + public DatagramChannel getJavaChannel() { + return (DatagramChannel) super.getJavaChannel(); } /** @@ -75,8 +75,8 @@ public class NioDatagramChannel extends AbstractNioChannel { */ @Override public void bind() throws IOException { - getChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); - getChannel().configureBlocking(false); + getJavaChannel().socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + getJavaChannel().configureBlocking(false); System.out.println("Bound UDP socket at port: " + port); } @@ -87,7 +87,7 @@ public class NioDatagramChannel extends AbstractNioChannel { @Override protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException { DatagramPacket pendingPacket = (DatagramPacket) pendingWrite; - getChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); + getJavaChannel().send(pendingPacket.getData(), pendingPacket.getReceiver()); } /** diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java index b818612e5..16c13e5f9 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioReactor.java @@ -74,12 +74,13 @@ public class NioReactor { * Stops the reactor and related resources such as dispatcher. * * @throws InterruptedException if interrupted while stopping the reactor. + * @throws IOException if any I/O error occurs. */ - public void stop() throws InterruptedException { + public void stop() throws InterruptedException, IOException { reactorMain.shutdownNow(); selector.wakeup(); reactorMain.awaitTermination(4, TimeUnit.SECONDS); - dispatcher.stop(); + selector.close(); } /** @@ -94,7 +95,7 @@ public class NioReactor { * @throws IOException if any I/O error occurs. */ public NioReactor registerChannel(AbstractNioChannel channel) throws IOException { - SelectionKey key = channel.getChannel().register(selector, channel.getInterestedOps()); + SelectionKey key = channel.getJavaChannel().register(selector, channel.getInterestedOps()); key.attach(channel); channel.setReactor(this); return this; diff --git a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java index c635a6076..c5caaa7ff 100644 --- a/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java +++ b/reactor/src/main/java/com/iluwatar/reactor/framework/NioServerSocketChannel.java @@ -43,8 +43,8 @@ public class NioServerSocketChannel extends AbstractNioChannel { * @return the underlying {@link ServerSocketChannel}. */ @Override - public ServerSocketChannel getChannel() { - return (ServerSocketChannel) super.getChannel(); + public ServerSocketChannel getJavaChannel() { + return (ServerSocketChannel) super.getJavaChannel(); } /** @@ -71,8 +71,8 @@ public class NioServerSocketChannel extends AbstractNioChannel { */ @Override public void bind() throws IOException { - ((ServerSocketChannel) getChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); - ((ServerSocketChannel) getChannel()).configureBlocking(false); + ((ServerSocketChannel) getJavaChannel()).socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port)); + ((ServerSocketChannel) getJavaChannel()).configureBlocking(false); System.out.println("Bound TCP socket at port: " + port); } diff --git a/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java index 752192ef3..9abb4e690 100644 --- a/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java +++ b/reactor/src/test/java/com/iluwatar/reactor/app/AppTest.java @@ -22,8 +22,8 @@ public class AppTest { */ @Test public void testAppUsingThreadPoolDispatcher() throws IOException, InterruptedException { - App app = new App(); - app.start(new ThreadPoolDispatcher(2)); + App app = new App(new ThreadPoolDispatcher(2)); + app.start(); AppClient client = new AppClient(); client.start(); @@ -48,8 +48,8 @@ public class AppTest { */ @Test public void testAppUsingSameThreadDispatcher() throws IOException, InterruptedException { - App app = new App(); - app.start(new SameThreadDispatcher()); + App app = new App(new SameThreadDispatcher()); + app.start(); AppClient client = new AppClient(); client.start();