Merge pull request #617 from mookkiah/issue_587_reader-writer-lock
#587 SonarQube reports bugs reader-writer-lock and refactor
This commit is contained in:
commit
81f4df9a32
@ -23,14 +23,15 @@
|
|||||||
|
|
||||||
package com.iluwatar.reader.writer.lock;
|
package com.iluwatar.reader.writer.lock;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* In a multiple thread applications, the threads may try to synchronize the shared resources
|
* In a multiple thread applications, the threads may try to synchronize the shared resources
|
||||||
@ -63,20 +64,41 @@ public class App {
|
|||||||
ExecutorService executeService = Executors.newFixedThreadPool(10);
|
ExecutorService executeService = Executors.newFixedThreadPool(10);
|
||||||
ReaderWriterLock lock = new ReaderWriterLock();
|
ReaderWriterLock lock = new ReaderWriterLock();
|
||||||
|
|
||||||
// Start 5 readers
|
// Start writers
|
||||||
IntStream.range(0, 5)
|
IntStream.range(0, 5)
|
||||||
.forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock())));
|
.forEach(i -> executeService.submit(new Writer("Writer " + i, lock.writeLock(),
|
||||||
|
ThreadLocalRandom.current().nextLong(5000))));
|
||||||
|
LOGGER.info("Writers added...");
|
||||||
|
|
||||||
// Start 5 writers
|
// Start readers
|
||||||
IntStream.range(0, 5)
|
IntStream.range(0, 5)
|
||||||
.forEach(i -> executeService.submit(new Writer("Writer " + i, lock.writeLock())));
|
.forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock(),
|
||||||
|
ThreadLocalRandom.current().nextLong(10))));
|
||||||
|
LOGGER.info("Readers added...");
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000L);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOGGER.error("Error sleeping before adding more readers", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start readers
|
||||||
|
IntStream.range(6, 10)
|
||||||
|
.forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock(),
|
||||||
|
ThreadLocalRandom.current().nextLong(10))));
|
||||||
|
LOGGER.info("More readers added...");
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// In the system console, it can see that the read operations are executed concurrently while
|
// In the system console, it can see that the read operations are executed concurrently while
|
||||||
// write operations are exclusive.
|
// write operations are exclusive.
|
||||||
executeService.shutdown();
|
executeService.shutdown();
|
||||||
try {
|
try {
|
||||||
executeService.awaitTermination(5, TimeUnit.SECONDS);
|
executeService.awaitTermination(5, TimeUnit.SECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOGGER.error("Error waiting for ExecutorService shutdown");
|
LOGGER.error("Error waiting for ExecutorService shutdown", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,11 +22,11 @@
|
|||||||
*/
|
*/
|
||||||
package com.iluwatar.reader.writer.lock;
|
package com.iluwatar.reader.writer.lock;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reader class, read when it acquired the read lock
|
* Reader class, read when it acquired the read lock
|
||||||
*/
|
*/
|
||||||
@ -38,9 +38,29 @@ public class Reader implements Runnable {
|
|||||||
|
|
||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
public Reader(String name, Lock readLock) {
|
private long readingTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new Reader
|
||||||
|
*
|
||||||
|
* @param name - Name of the thread owning the reader
|
||||||
|
* @param readLock - Lock for this reader
|
||||||
|
* @param readingTime - amount of time (in milliseconds) for this reader to engage reading
|
||||||
|
*/
|
||||||
|
public Reader(String name, Lock readLock, long readingTime) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.readLock = readLock;
|
this.readLock = readLock;
|
||||||
|
this.readingTime = readingTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new Reader who reads for 250ms
|
||||||
|
*
|
||||||
|
* @param name - Name of the thread owning the reader
|
||||||
|
* @param readLock - Lock for this reader
|
||||||
|
*/
|
||||||
|
public Reader(String name, Lock readLock) {
|
||||||
|
this(name, readLock, 250L);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -49,7 +69,8 @@ public class Reader implements Runnable {
|
|||||||
try {
|
try {
|
||||||
read();
|
read();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
LOGGER.info("InterruptedException when reading", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
@ -61,7 +82,7 @@ public class Reader implements Runnable {
|
|||||||
*/
|
*/
|
||||||
public void read() throws InterruptedException {
|
public void read() throws InterruptedException {
|
||||||
LOGGER.info("{} begin", name);
|
LOGGER.info("{} begin", name);
|
||||||
Thread.sleep(250);
|
Thread.sleep(readingTime);
|
||||||
LOGGER.info("{} finish", name);
|
LOGGER.info("{} finish after reading {}ms", name, readingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,14 +29,19 @@ import java.util.concurrent.locks.Condition;
|
|||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class responsible for control the access for reader or writer
|
* Class responsible for control the access for reader or writer
|
||||||
*
|
*
|
||||||
* Allows multiple readers to hold the lock at same time, but if any writer holds the lock then
|
* Allows multiple readers to hold the lock at same time, but if any writer holds the lock then readers wait. If reader
|
||||||
* readers wait. If reader holds the lock then writer waits. This lock is not fair.
|
* holds the lock then writer waits. This lock is not fair.
|
||||||
*/
|
*/
|
||||||
public class ReaderWriterLock implements ReadWriteLock {
|
public class ReaderWriterLock implements ReadWriteLock {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(ReaderWriterLock.class);
|
||||||
|
|
||||||
|
|
||||||
private Object readerMutex = new Object();
|
private Object readerMutex = new Object();
|
||||||
|
|
||||||
@ -45,10 +50,10 @@ public class ReaderWriterLock implements ReadWriteLock {
|
|||||||
/**
|
/**
|
||||||
* Global mutex is used to indicate that whether reader or writer gets the lock in the moment.
|
* Global mutex is used to indicate that whether reader or writer gets the lock in the moment.
|
||||||
* <p>
|
* <p>
|
||||||
* 1. When it contains the reference of {@link readerLock}, it means that the lock is acquired by
|
* 1. When it contains the reference of {@link readerLock}, it means that the lock is acquired by the reader, another
|
||||||
* the reader, another reader can also do the read operation concurrently. <br>
|
* reader can also do the read operation concurrently. <br>
|
||||||
* 2. When it contains the reference of reference of {@link writerLock}, it means that the lock is
|
* 2. When it contains the reference of reference of {@link writerLock}, it means that the lock is acquired by the
|
||||||
* acquired by the writer exclusively, no more reader or writer can get the lock.
|
* writer exclusively, no more reader or writer can get the lock.
|
||||||
* <p>
|
* <p>
|
||||||
* This is the most important field in this class to control the access for reader/writer.
|
* This is the most important field in this class to control the access for reader/writer.
|
||||||
*/
|
*/
|
||||||
@ -74,13 +79,6 @@ public class ReaderWriterLock implements ReadWriteLock {
|
|||||||
return globalMutex.contains(writerLock);
|
return globalMutex.contains(writerLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* return true when globalMutex hold the reference of readerLock
|
|
||||||
*/
|
|
||||||
private boolean doesReaderOwnThisLock() {
|
|
||||||
return globalMutex.contains(readerLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Nobody get the lock when globalMutex contains nothing
|
* Nobody get the lock when globalMutex contains nothing
|
||||||
*
|
*
|
||||||
@ -89,14 +87,6 @@ public class ReaderWriterLock implements ReadWriteLock {
|
|||||||
return globalMutex.isEmpty();
|
return globalMutex.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void waitUninterruptibly(Object o) {
|
|
||||||
try {
|
|
||||||
o.wait();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reader Lock, can be access for more than one reader concurrently if no writer get the lock
|
* Reader Lock, can be access for more than one reader concurrently if no writer get the lock
|
||||||
*/
|
*/
|
||||||
@ -104,29 +94,33 @@ public class ReaderWriterLock implements ReadWriteLock {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void lock() {
|
public void lock() {
|
||||||
|
|
||||||
synchronized (readerMutex) {
|
synchronized (readerMutex) {
|
||||||
|
|
||||||
currentReaderCount++;
|
currentReaderCount++;
|
||||||
if (currentReaderCount == 1) {
|
if (currentReaderCount == 1) {
|
||||||
// Try to get the globalMutex lock for the first reader
|
acquireForReaders();
|
||||||
synchronized (globalMutex) {
|
|
||||||
while (true) {
|
|
||||||
// If the no one get the lock or the lock is locked by reader, just set the reference
|
|
||||||
// to the globalMutex to indicate that the lock is locked by Reader.
|
|
||||||
if (isLockFree() || doesReaderOwnThisLock()) {
|
|
||||||
globalMutex.add(this);
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// If lock is acquired by the write, let the thread wait until the writer release
|
|
||||||
// the lock
|
|
||||||
waitUninterruptibly(globalMutex);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Acquire the globalMutex lock on behalf of current and future concurrent readers. Make sure no writers currently
|
||||||
|
* owns the lock.
|
||||||
|
*/
|
||||||
|
private void acquireForReaders() {
|
||||||
|
// Try to get the globalMutex lock for the first reader
|
||||||
|
synchronized (globalMutex) {
|
||||||
|
// If the no one get the lock or the lock is locked by reader, just set the reference
|
||||||
|
// to the globalMutex to indicate that the lock is locked by Reader.
|
||||||
|
while (doesWriterOwnThisLock()) {
|
||||||
|
try {
|
||||||
|
globalMutex.wait();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOGGER.info("InterruptedException while waiting for globalMutex in acquireForReaders", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
globalMutex.add(this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -179,23 +173,17 @@ public class ReaderWriterLock implements ReadWriteLock {
|
|||||||
|
|
||||||
synchronized (globalMutex) {
|
synchronized (globalMutex) {
|
||||||
|
|
||||||
while (true) {
|
// Wait until the lock is free.
|
||||||
// When there is no one acquired the lock, just put the writeLock reference to the
|
while (!isLockFree()) {
|
||||||
// globalMutex to indicate that the lock is acquired by one writer.
|
try {
|
||||||
// It is ensure that writer can only get the lock when no reader/writer acquired the lock.
|
globalMutex.wait();
|
||||||
if (isLockFree()) {
|
} catch (InterruptedException e) {
|
||||||
|
LOGGER.info("InterruptedException while waiting for globalMutex to begin writing", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// When the lock is free, acquire it by placing an entry in globalMutex
|
||||||
globalMutex.add(this);
|
globalMutex.add(this);
|
||||||
break;
|
|
||||||
} else if (doesWriterOwnThisLock()) {
|
|
||||||
// Wait when other writer get the lock
|
|
||||||
waitUninterruptibly(globalMutex);
|
|
||||||
} else if (doesReaderOwnThisLock()) {
|
|
||||||
// Wait when other reader get the lock
|
|
||||||
waitUninterruptibly(globalMutex);
|
|
||||||
} else {
|
|
||||||
throw new AssertionError("it should never reach here");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,11 +22,11 @@
|
|||||||
*/
|
*/
|
||||||
package com.iluwatar.reader.writer.lock;
|
package com.iluwatar.reader.writer.lock;
|
||||||
|
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.concurrent.locks.Lock;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writer class, write when it acquired the write lock
|
* Writer class, write when it acquired the write lock
|
||||||
*/
|
*/
|
||||||
@ -38,9 +38,29 @@ public class Writer implements Runnable {
|
|||||||
|
|
||||||
private String name;
|
private String name;
|
||||||
|
|
||||||
|
private long writingTime;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new Writer who writes for 250ms
|
||||||
|
*
|
||||||
|
* @param name - Name of the thread owning the writer
|
||||||
|
* @param writeLock - Lock for this writer
|
||||||
|
*/
|
||||||
public Writer(String name, Lock writeLock) {
|
public Writer(String name, Lock writeLock) {
|
||||||
|
this(name, writeLock, 250L);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create new Writer
|
||||||
|
*
|
||||||
|
* @param name - Name of the thread owning the writer
|
||||||
|
* @param writeLock - Lock for this writer
|
||||||
|
* @param writingTime - amount of time (in milliseconds) for this reader to engage writing
|
||||||
|
*/
|
||||||
|
public Writer(String name, Lock writeLock, long writingTime) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.writeLock = writeLock;
|
this.writeLock = writeLock;
|
||||||
|
this.writingTime = writingTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -50,7 +70,8 @@ public class Writer implements Runnable {
|
|||||||
try {
|
try {
|
||||||
write();
|
write();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
LOGGER.info("InterruptedException when writing", e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
@ -61,7 +82,7 @@ public class Writer implements Runnable {
|
|||||||
*/
|
*/
|
||||||
public void write() throws InterruptedException {
|
public void write() throws InterruptedException {
|
||||||
LOGGER.info("{} begin", name);
|
LOGGER.info("{} begin", name);
|
||||||
Thread.sleep(250);
|
Thread.sleep(writingTime);
|
||||||
LOGGER.info("{} finish", name);
|
LOGGER.info("{} finished after writing {}ms", name, writingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,6 @@ public class InMemoryAppender extends AppenderBase<ILoggingEvent> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public boolean logContains(String message) {
|
public boolean logContains(String message) {
|
||||||
return log.stream().anyMatch(event -> event.getFormattedMessage().equals(message));
|
return log.stream().anyMatch(event -> event.getFormattedMessage().contains(message));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user