Java 11 migrate remaining q-r (#1121)
* Moves queue-load-leveling to Java 11 * Moves reactor to Java 11 * Moves reader-writer-lock to Java 11 * Moves repository to Java 11 * Moves resource-acquisition-is-initialization to Java 11 * Moves retry to Java 11 * Moves role-object to Java 11
This commit is contained in:
committed by
Ilkka Seppälä
parent
cd2a2e7711
commit
20ea465b7f
@ -124,15 +124,17 @@ public class App {
|
||||
* This represents application specific business logic that dispatcher will call on appropriate
|
||||
* events. These events are read events in our example.
|
||||
*/
|
||||
LoggingHandler loggingHandler = new LoggingHandler();
|
||||
var loggingHandler = new LoggingHandler();
|
||||
|
||||
/*
|
||||
* Our application binds to multiple channels and uses same logging handler to handle incoming
|
||||
* log requests.
|
||||
*/
|
||||
reactor.registerChannel(tcpChannel(6666, loggingHandler))
|
||||
reactor
|
||||
.registerChannel(tcpChannel(6666, loggingHandler))
|
||||
.registerChannel(tcpChannel(6667, loggingHandler))
|
||||
.registerChannel(udpChannel(6668, loggingHandler)).start();
|
||||
.registerChannel(udpChannel(6668, loggingHandler))
|
||||
.start();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -144,20 +146,20 @@ public class App {
|
||||
public void stop() throws InterruptedException, IOException {
|
||||
reactor.stop();
|
||||
dispatcher.stop();
|
||||
for (AbstractNioChannel channel : channels) {
|
||||
for (var channel : channels) {
|
||||
channel.getJavaChannel().close();
|
||||
}
|
||||
}
|
||||
|
||||
private AbstractNioChannel tcpChannel(int port, ChannelHandler handler) throws IOException {
|
||||
NioServerSocketChannel channel = new NioServerSocketChannel(port, handler);
|
||||
var channel = new NioServerSocketChannel(port, handler);
|
||||
channel.bind();
|
||||
channels.add(channel);
|
||||
return channel;
|
||||
}
|
||||
|
||||
private AbstractNioChannel udpChannel(int port, ChannelHandler handler) throws IOException {
|
||||
NioDatagramChannel channel = new NioDatagramChannel(port, handler);
|
||||
var channel = new NioDatagramChannel(port, handler);
|
||||
channel.bind();
|
||||
channels.add(channel);
|
||||
return channel;
|
||||
|
@ -25,7 +25,6 @@ package com.iluwatar.reactor.app;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
@ -55,7 +54,7 @@ public class AppClient {
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
public static void main(String[] args) throws IOException {
|
||||
AppClient appClient = new AppClient();
|
||||
var appClient = new AppClient();
|
||||
appClient.start();
|
||||
}
|
||||
|
||||
@ -118,8 +117,8 @@ public class AppClient {
|
||||
@Override
|
||||
public void run() {
|
||||
try (Socket socket = new Socket(InetAddress.getLocalHost(), serverPort)) {
|
||||
OutputStream outputStream = socket.getOutputStream();
|
||||
PrintWriter writer = new PrintWriter(outputStream);
|
||||
var outputStream = socket.getOutputStream();
|
||||
var writer = new PrintWriter(outputStream);
|
||||
sendLogRequests(writer, socket.getInputStream());
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("error sending requests", e);
|
||||
@ -128,12 +127,12 @@ public class AppClient {
|
||||
}
|
||||
|
||||
private void sendLogRequests(PrintWriter writer, InputStream inputStream) throws IOException {
|
||||
for (int i = 0; i < 4; i++) {
|
||||
for (var i = 0; i < 4; i++) {
|
||||
writer.println(clientName + " - Log request: " + i);
|
||||
writer.flush();
|
||||
|
||||
byte[] data = new byte[1024];
|
||||
int read = inputStream.read(data, 0, data.length);
|
||||
var data = new byte[1024];
|
||||
var read = inputStream.read(data, 0, data.length);
|
||||
if (read == 0) {
|
||||
LOGGER.info("Read zero bytes");
|
||||
} else {
|
||||
@ -167,17 +166,17 @@ public class AppClient {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
for (int i = 0; i < 4; i++) {
|
||||
try (var socket = new DatagramSocket()) {
|
||||
for (var i = 0; i < 4; i++) {
|
||||
|
||||
String message = clientName + " - Log request: " + i;
|
||||
DatagramPacket request =
|
||||
new DatagramPacket(message.getBytes(), message.getBytes().length, remoteAddress);
|
||||
var message = clientName + " - Log request: " + i;
|
||||
var bytes = message.getBytes();
|
||||
var request = new DatagramPacket(bytes, bytes.length, remoteAddress);
|
||||
|
||||
socket.send(request);
|
||||
|
||||
byte[] data = new byte[1024];
|
||||
DatagramPacket reply = new DatagramPacket(data, data.length);
|
||||
var data = new byte[1024];
|
||||
var reply = new DatagramPacket(data, data.length);
|
||||
socket.receive(reply);
|
||||
if (reply.getLength() == 0) {
|
||||
LOGGER.info("Read zero bytes");
|
||||
|
@ -54,7 +54,7 @@ public class LoggingHandler implements ChannelHandler {
|
||||
doLogging((ByteBuffer) readObject);
|
||||
sendReply(channel, key);
|
||||
} else if (readObject instanceof DatagramPacket) {
|
||||
DatagramPacket datagram = (DatagramPacket) readObject;
|
||||
var datagram = (DatagramPacket) readObject;
|
||||
doLogging(datagram.getData());
|
||||
sendReply(channel, datagram, key);
|
||||
} else {
|
||||
@ -71,14 +71,14 @@ public class LoggingHandler implements ChannelHandler {
|
||||
* Create a reply acknowledgement datagram packet setting the receiver to the sender of incoming
|
||||
* message.
|
||||
*/
|
||||
DatagramPacket replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK));
|
||||
var replyPacket = new DatagramPacket(ByteBuffer.wrap(ACK));
|
||||
replyPacket.setReceiver(incomingPacket.getSender());
|
||||
|
||||
channel.write(replyPacket, key);
|
||||
}
|
||||
|
||||
private static void sendReply(AbstractNioChannel channel, SelectionKey key) {
|
||||
ByteBuffer buffer = ByteBuffer.wrap(ACK);
|
||||
var buffer = ByteBuffer.wrap(ACK);
|
||||
channel.write(buffer, key);
|
||||
}
|
||||
|
||||
|
@ -46,8 +46,7 @@ public abstract class AbstractNioChannel {
|
||||
|
||||
private final SelectableChannel channel;
|
||||
private final ChannelHandler handler;
|
||||
private final Map<SelectableChannel, Queue<Object>> channelToPendingWrites =
|
||||
new ConcurrentHashMap<>();
|
||||
private final Map<SelectableChannel, Queue<Object>> channelToPendingWrites;
|
||||
private NioReactor reactor;
|
||||
|
||||
/**
|
||||
@ -59,6 +58,7 @@ public abstract class AbstractNioChannel {
|
||||
public AbstractNioChannel(ChannelHandler handler, SelectableChannel channel) {
|
||||
this.handler = handler;
|
||||
this.channel = channel;
|
||||
this.channelToPendingWrites = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -117,18 +117,14 @@ public abstract class AbstractNioChannel {
|
||||
* whole pending block of data at once.
|
||||
*/
|
||||
void flush(SelectionKey key) throws IOException {
|
||||
Queue<Object> pendingWrites = channelToPendingWrites.get(key.channel());
|
||||
while (true) {
|
||||
Object pendingWrite = pendingWrites.poll();
|
||||
if (pendingWrite == null) {
|
||||
// We don't have anything more to write so channel is interested in reading more data
|
||||
reactor.changeOps(key, SelectionKey.OP_READ);
|
||||
break;
|
||||
}
|
||||
|
||||
var pendingWrites = channelToPendingWrites.get(key.channel());
|
||||
Object pendingWrite;
|
||||
while ((pendingWrite = pendingWrites.poll()) != null) {
|
||||
// ask the concrete channel to make sense of data and write it to java channel
|
||||
doWrite(pendingWrite, key);
|
||||
}
|
||||
// We don't have anything more to write so channel is interested in reading more data
|
||||
reactor.changeOps(key, SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -162,7 +158,7 @@ public abstract class AbstractNioChannel {
|
||||
* @param key the key which is writable.
|
||||
*/
|
||||
public void write(Object data, SelectionKey key) {
|
||||
Queue<Object> pendingWrites = this.channelToPendingWrites.get(key.channel());
|
||||
var pendingWrites = this.channelToPendingWrites.get(key.channel());
|
||||
if (pendingWrites == null) {
|
||||
synchronized (this.channelToPendingWrites) {
|
||||
pendingWrites = this.channelToPendingWrites.get(key.channel());
|
||||
|
@ -73,15 +73,15 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
*/
|
||||
@Override
|
||||
public DatagramPacket read(SelectionKey key) throws IOException {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
SocketAddress sender = ((DatagramChannel) key.channel()).receive(buffer);
|
||||
var buffer = ByteBuffer.allocate(1024);
|
||||
var sender = ((DatagramChannel) key.channel()).receive(buffer);
|
||||
|
||||
/*
|
||||
* It is required to create a DatagramPacket because we need to preserve which socket address
|
||||
* acts as destination for sending reply packets.
|
||||
*/
|
||||
buffer.flip();
|
||||
DatagramPacket packet = new DatagramPacket(buffer);
|
||||
var packet = new DatagramPacket(buffer);
|
||||
packet.setSender(sender);
|
||||
|
||||
return packet;
|
||||
@ -115,7 +115,7 @@ public class NioDatagramChannel extends AbstractNioChannel {
|
||||
*/
|
||||
@Override
|
||||
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
|
||||
DatagramPacket pendingPacket = (DatagramPacket) pendingWrite;
|
||||
var pendingPacket = (DatagramPacket) pendingWrite;
|
||||
getJavaChannel().send(pendingPacket.getData(), pendingPacket.getReceiver());
|
||||
}
|
||||
|
||||
|
@ -27,10 +27,7 @@ import java.io.IOException;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -119,20 +116,15 @@ public class NioReactor {
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*/
|
||||
public NioReactor registerChannel(AbstractNioChannel channel) throws IOException {
|
||||
SelectionKey key = channel.getJavaChannel().register(selector, channel.getInterestedOps());
|
||||
var key = channel.getJavaChannel().register(selector, channel.getInterestedOps());
|
||||
key.attach(channel);
|
||||
channel.setReactor(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
private void eventLoop() throws IOException {
|
||||
while (true) {
|
||||
|
||||
// honor interrupt request
|
||||
if (Thread.interrupted()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// honor interrupt request
|
||||
while (!Thread.interrupted()) {
|
||||
// honor any pending commands first
|
||||
processPendingCommands();
|
||||
|
||||
@ -145,12 +137,11 @@ public class NioReactor {
|
||||
/*
|
||||
* Represents the events that have occurred on registered handles.
|
||||
*/
|
||||
Set<SelectionKey> keys = selector.selectedKeys();
|
||||
|
||||
Iterator<SelectionKey> iterator = keys.iterator();
|
||||
var keys = selector.selectedKeys();
|
||||
var iterator = keys.iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
SelectionKey key = iterator.next();
|
||||
var key = iterator.next();
|
||||
if (!key.isValid()) {
|
||||
iterator.remove();
|
||||
continue;
|
||||
@ -162,9 +153,9 @@ public class NioReactor {
|
||||
}
|
||||
|
||||
private void processPendingCommands() {
|
||||
Iterator<Runnable> iterator = pendingCommands.iterator();
|
||||
var iterator = pendingCommands.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Runnable command = iterator.next();
|
||||
var command = iterator.next();
|
||||
command.run();
|
||||
iterator.remove();
|
||||
}
|
||||
@ -185,15 +176,14 @@ public class NioReactor {
|
||||
}
|
||||
|
||||
private static void onChannelWritable(SelectionKey key) throws IOException {
|
||||
AbstractNioChannel channel = (AbstractNioChannel) key.attachment();
|
||||
var channel = (AbstractNioChannel) key.attachment();
|
||||
channel.flush(key);
|
||||
}
|
||||
|
||||
private void onChannelReadable(SelectionKey key) {
|
||||
try {
|
||||
// reads the incoming data in context of reactor main loop. Can this be improved?
|
||||
Object readObject = ((AbstractNioChannel) key.attachment()).read(key);
|
||||
|
||||
var readObject = ((AbstractNioChannel) key.attachment()).read(key);
|
||||
dispatchReadEvent(key, readObject);
|
||||
} catch (IOException e) {
|
||||
try {
|
||||
@ -212,10 +202,10 @@ public class NioReactor {
|
||||
}
|
||||
|
||||
private void onChannelAcceptable(SelectionKey key) throws IOException {
|
||||
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
|
||||
SocketChannel socketChannel = serverSocketChannel.accept();
|
||||
var serverSocketChannel = (ServerSocketChannel) key.channel();
|
||||
var socketChannel = serverSocketChannel.accept();
|
||||
socketChannel.configureBlocking(false);
|
||||
SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ);
|
||||
var readKey = socketChannel.register(selector, SelectionKey.OP_READ);
|
||||
readKey.attach(key.attachment());
|
||||
}
|
||||
|
||||
|
@ -83,9 +83,9 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer read(SelectionKey key) throws IOException {
|
||||
SocketChannel socketChannel = (SocketChannel) key.channel();
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
int read = socketChannel.read(buffer);
|
||||
var socketChannel = (SocketChannel) key.channel();
|
||||
var buffer = ByteBuffer.allocate(1024);
|
||||
var read = socketChannel.read(buffer);
|
||||
buffer.flip();
|
||||
if (read == -1) {
|
||||
throw new IOException("Socket closed");
|
||||
@ -100,9 +100,9 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
*/
|
||||
@Override
|
||||
public void bind() throws IOException {
|
||||
getJavaChannel().socket().bind(
|
||||
new InetSocketAddress(InetAddress.getLocalHost(), port));
|
||||
getJavaChannel().configureBlocking(false);
|
||||
var javaChannel = getJavaChannel();
|
||||
javaChannel.socket().bind(new InetSocketAddress(InetAddress.getLocalHost(), port));
|
||||
javaChannel.configureBlocking(false);
|
||||
LOGGER.info("Bound TCP socket at port: {}", port);
|
||||
}
|
||||
|
||||
@ -112,7 +112,7 @@ public class NioServerSocketChannel extends AbstractNioChannel {
|
||||
*/
|
||||
@Override
|
||||
protected void doWrite(Object pendingWrite, SelectionKey key) throws IOException {
|
||||
ByteBuffer pendingBuffer = (ByteBuffer) pendingWrite;
|
||||
var pendingBuffer = (ByteBuffer) pendingWrite;
|
||||
((SocketChannel) key.channel()).write(pendingBuffer);
|
||||
}
|
||||
}
|
||||
|
@ -25,14 +25,12 @@ package com.iluwatar.reactor.app;
|
||||
|
||||
import com.iluwatar.reactor.framework.SameThreadDispatcher;
|
||||
import com.iluwatar.reactor.framework.ThreadPoolDispatcher;
|
||||
import java.io.IOException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
* This class tests the Distributed Logging service by starting a Reactor and then sending it
|
||||
* concurrent logging requests using multiple clients.
|
||||
*/
|
||||
@ -42,17 +40,17 @@ public class ReactorTest {
|
||||
|
||||
/**
|
||||
* Test the application using pooled thread dispatcher.
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
* @throws InterruptedException if interrupted while stopping the application.
|
||||
*/
|
||||
@Test
|
||||
public void testAppUsingThreadPoolDispatcher() throws IOException, InterruptedException {
|
||||
LOGGER.info("testAppUsingThreadPoolDispatcher start");
|
||||
App app = new App(new ThreadPoolDispatcher(2));
|
||||
var app = new App(new ThreadPoolDispatcher(2));
|
||||
app.start();
|
||||
|
||||
AppClient client = new AppClient();
|
||||
var client = new AppClient();
|
||||
client.start();
|
||||
|
||||
// allow clients to send requests. Artificial delay.
|
||||
@ -70,17 +68,17 @@ public class ReactorTest {
|
||||
|
||||
/**
|
||||
* Test the application using same thread dispatcher.
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
*
|
||||
* @throws IOException if any I/O error occurs.
|
||||
* @throws InterruptedException if interrupted while stopping the application.
|
||||
*/
|
||||
@Test
|
||||
public void testAppUsingSameThreadDispatcher() throws IOException, InterruptedException {
|
||||
LOGGER.info("testAppUsingSameThreadDispatcher start");
|
||||
App app = new App(new SameThreadDispatcher());
|
||||
var app = new App(new SameThreadDispatcher());
|
||||
app.start();
|
||||
|
||||
AppClient client = new AppClient();
|
||||
var client = new AppClient();
|
||||
client.start();
|
||||
|
||||
// allow clients to send requests. Artificial delay.
|
||||
|
Reference in New Issue
Block a user