Monitor Object pattern #466

This commit is contained in:
nikhilbarar 2018-06-13 02:43:25 +05:30
parent c48a1e9193
commit 4456a440bc
19 changed files with 654 additions and 0 deletions

24
monitor-object/pom.xml Normal file
View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.iluwatar</groupId>
<artifactId>java-design-patterns</artifactId>
<version>1.20.0-SNAPSHOT</version>
</parent>
<artifactId>monitor-object</artifactId>
<name>monitor-object</name>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,214 @@
package com.iluwatar.monitor;
import java.util.ArrayList;
/**
* A class for Monitors. Monitors provide coordination of concurrent threads.
* Each Monitor protects some resource, usually data. At each point in time a
* monitor object is occupied by at most one thread.
*/
public abstract class AbstractMonitor {
final Semaphore entrance = new Semaphore(1);
volatile Thread occupant = null;
private final ArrayList<MonitorListener> listOfListeners = new ArrayList<>();
private final String name;
public String getName() {
return name;
}
protected AbstractMonitor() {
this(null);
}
protected AbstractMonitor(String name) {
this.name = name;
}
/**
* The invariant. The default implementation always returns true. This method
* should be overridden if at all possible with the strongest economically
* evaluable invariant.
*/
protected boolean invariant() {
return true;
}
/**
* Enter the monitor. Any thread calling this method is delayed until the
* monitor is unoccupied. Upon returning from this method, the monitor is
* considered occupied. A thread must not attempt to enter a Monitor it is
* already in.
*/
protected void enter() {
notifyCallEnter();
entrance.acquire();
// The following assertion should never trip!
Assertion.check(occupant == null, "2 threads in one monitor");
occupant = Thread.currentThread();
notifyReturnFromEnter();
Assertion.check(invariant(), "Invariant of monitor " + getName());
}
/**
* Leave the monitor. After returning from this method, the thread no longer
* occupies the monitor. Only a thread that is in the monitor may leave it.
*
* @throws AssertionError
* if the thread that leaves is not the occupant.
*/
protected void leave() {
notifyLeaveMonitor();
leaveWithoutATrace();
}
/**
* Leave the monitor. After returning from this method, the thread no longer
* occupies the monitor. Only a thread that is in the monitor may leave it.
*
* @throws AssertionError
* if the thread that leaves is not the occupant.
*/
protected <T> T leave(T result) {
leave();
return result;
}
void leaveWithoutATrace() {
Assertion.check(invariant(), "Invariant of monitor " + getName());
Assertion.check(occupant == Thread.currentThread(), "Thread is not occupant");
occupant = null;
entrance.release();
}
/**
* Run the runnable inside the monitor. Any thread calling this method will be
* delayed until the monitor is empty. The "run" method of its argument is then
* executed within the protection of the monitor. When the run method returns,
* if the thread still occupies the monitor, it leaves the monitor.
*
* @param runnable
* A Runnable object.
*/
protected void doWithin(Runnable runnable) {
enter();
try {
runnable.run();
} finally {
if (occupant == Thread.currentThread()) {
leave();
}
}
}
/**
* Run the runnable inside the monitor. Any thread calling this method will be
* delayed until the monitor is empty. The "run" method of its argument is then
* executed within the protection of the monitor. When the run method returns,
* if the thread still occupies the monitor, it leaves the monitor. Thus the
* signalAndLeave method may be called within the run method.
*
* @param runnable
* A RunnableWithResult object.
* @return The value computed by the run method of the runnable.
*/
protected <T> T doWithin(RunnableWithResult<T> runnable) {
enter();
try {
return runnable.run();
} finally {
if (occupant == Thread.currentThread()) {
leave();
}
}
}
/**
* Create a condition queue associated with a checked Assertion. The Assertion
* will be checked prior to an signal of the condition.
*/
protected Condition makeCondition(Assertion prop) {
return makeCondition(null, prop);
}
/**
* Create a condition queue with no associated checked Assertion.
*/
protected Condition makeCondition() {
return makeCondition(null, TrueAssertion.singleton);
}
/**
* Create a condition queue associated with a checked Assertion. The Assertion
* will be checked prior to an signal of the condition.
*/
protected Condition makeCondition(String name, Assertion prop) {
return new Condition(name, this, prop);
}
/**
* Create a condition queue with no associated checked Assertion.
*/
protected Condition makeCondition(String name) {
return makeCondition(name, TrueAssertion.singleton);
}
/** Register a listener. */
public void addListener(MonitorListener newListener) {
listOfListeners.add(newListener);
}
private void notifyCallEnter() {
for (MonitorListener listener : listOfListeners) {
listener.callEnterMonitor(this);
}
}
private void notifyReturnFromEnter() {
for (MonitorListener listener : listOfListeners) {
listener.returnFromEnterMonitor(this);
}
}
private void notifyLeaveMonitor() {
for (MonitorListener listener : listOfListeners) {
listener.leaveMonitor(this);
}
}
void notifyCallAwait(Condition condition) {
for (MonitorListener listener : listOfListeners) {
listener.callAwait(condition, this);
}
}
void notifyReturnFromAwait(Condition condition) {
for (MonitorListener listener : listOfListeners) {
listener.returnFromAwait(condition, this);
}
}
void notifySignallerAwakesAwaitingThread(Condition condition) {
for (MonitorListener listener : listOfListeners) {
listener.signallerAwakesAwaitingThread(condition, this);
}
}
void notifySignallerLeavesTemporarily(Condition condition) {
for (MonitorListener listener : listOfListeners) {
listener.signallerLeavesTemporarily(condition, this);
}
}
void notifySignallerReenters(Condition condition) {
for (MonitorListener listener : listOfListeners) {
listener.signallerReenters(condition, this);
}
}
void notifySignallerLeavesMonitor(Condition condition) {
for (MonitorListener listener : listOfListeners) {
listener.signallerLeavesMonitor(condition, this);
}
}
}

View File

@ -0,0 +1 @@
package com.iluwatar.monitor; /** * Assertions that may be checked from time to time. */ public abstract class Assertion { private static final String DEFAULTMESSAGE = "Assertion Failure"; protected String message = DEFAULTMESSAGE; /** This method says whether the assertion is true. */ public abstract boolean isTrue(); /** Throw an AssertionError if the assertion is not true. */ public void check() { check(isTrue(), message); } /** Throw an AssertionError if the parameter is not true. */ public static void check(boolean b) { check(b, DEFAULTMESSAGE); } /** Throw an AssertionError if the boolean parameter is not true. */ public static void check(boolean b, String message) { if (!b) { throw new AssertionError(message); } } }

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,77 @@
package com.iluwatar.monitor;
/**
* A final class for Monitors.
*/
public final class Monitor extends AbstractMonitor {
private Assertion invariant;
public Monitor() {
this(TrueAssertion.singleton);
}
public Monitor(Assertion invariant) {
this.invariant = invariant;
}
public Monitor(String name) {
this(name, TrueAssertion.singleton);
}
public Monitor(String name, Assertion invariant) {
super(name);
this.invariant = invariant;
}
@Override
public boolean invariant() {
return invariant.isTrue();
}
@Override
public void enter() {
super.enter();
}
@Override
public void leave() {
super.leave();
}
@Override
public <T> T leave(T result) {
return super.leave(result);
}
@Override
public void doWithin(Runnable runnable) {
super.doWithin(runnable);
}
@Override
public <T> T doWithin(RunnableWithResult<T> runnable) {
return super.doWithin(runnable);
}
@Override
public Condition makeCondition() {
return super.makeCondition();
}
@Override
public Condition makeCondition(Assertion assertion) {
return super.makeCondition(assertion);
}
@Override
public Condition makeCondition(String name) {
return super.makeCondition(name);
}
@Override
public Condition makeCondition(String name, Assertion assertion) {
return super.makeCondition(name, assertion);
}
}

View File

@ -0,0 +1,25 @@
package com.iluwatar.monitor;
public interface MonitorListener {
void nameThisThread(String name);
void callEnterMonitor(AbstractMonitor monitor);
void returnFromEnterMonitor(AbstractMonitor monitor);
void leaveMonitor(AbstractMonitor monitor);
void callAwait(Condition condition, AbstractMonitor monitor);
void returnFromAwait(Condition condition, AbstractMonitor monitor);
void signallerAwakesAwaitingThread(Condition condition, AbstractMonitor monitor);
void signallerLeavesTemporarily(Condition condition, AbstractMonitor monitor);
void signallerReenters(Condition condition, AbstractMonitor monitor);
void signallerLeavesMonitor(Condition condition, AbstractMonitor monitor);
}

View File

@ -0,0 +1,5 @@
package com.iluwatar.monitor;
public interface RunnableWithResult<T> {
public T run();
}

View File

@ -0,0 +1 @@
package com.iluwatar.monitor; import java.util.LinkedList; import java.util.ListIterator; /** * A FIFO semaphore. */ public class Semaphore { // Each queue element is a a single use semaphore class QueueElement { final int priority; volatile boolean enabled = false; QueueElement(int priority) { this.priority = priority; } synchronized void acquire() { while (!enabled) { try { wait(); } catch (InterruptedException e) { throw new RuntimeException("Unexpected interruption of " + "thread in monitor.Semaphore.acquire"); } } } synchronized void release() { enabled = true; notify(); } } volatile int s1; final LinkedList<QueueElement> queue = new LinkedList<QueueElement>(); // Invariant. All elements on the queue are in an unenabled state. /** Initialize the semaphore to a value greater or equal to 0. */ public Semaphore(int initialvalue) { Assertion.check(initialvalue >= 0); this.s1 = initialvalue; } /** * The P operation. If two threads are blocked at the same time, they will be * served in FIFO order. <kbd>sem.acquire()</kbd> is equivalent to <kbd>acquire( * Integer.MAX_VALUE )</kbd>. */ public void acquire() { acquire(Integer.MAX_VALUE); } /** * The P operation with a priority. * * @param priority * The larger the integer, the less urgent the priority. If two * thread are waiting with equal priority, they will complete acquire * in FIFO order. */ public void acquire(int priority) { QueueElement mine; synchronized (this) { if (s1 > 0) { --s1; return; } mine = new QueueElement(priority); if (priority == Integer.MAX_VALUE) { queue.add(mine); } else { ListIterator<QueueElement> it = queue.listIterator(0); int i = 0; while (it.hasNext()) { QueueElement elem = it.next(); if (elem.priority > priority) { break; } ++i; } queue.add(i, mine); } } mine.acquire(); } /** The V operation. */ public synchronized void release() { QueueElement first = queue.poll(); if (first != null) { first.release(); } else { ++s1; } } }

View File

@ -0,0 +1 @@
package com.iluwatar.monitor; /** * An assertion that is always true. */ public class TrueAssertion extends Assertion { public boolean isTrue() { return true; } public static final TrueAssertion singleton = new TrueAssertion(); }

View File

@ -0,0 +1,66 @@
package com.iluwatar.monitor.examples;
import com.iluwatar.monitor.AbstractMonitor;
import com.iluwatar.monitor.Condition;
/**
* FIFO Queue implementation using {@link AbstractMonitor}.
*/
public class Queue extends AbstractMonitor {
private final int capacity = 10;
private final Object[] queue = new Object[capacity];
private volatile int count = 0;
private volatile int front = 0;
/** Awaiting ensures: count < capacity. */
private final Condition notFull = makeCondition();
/** Awaiting ensures: count > 0. */
private final Condition notEmpty = makeCondition();
/**
* Method to pop the front element from queue.
* @return the top most element
*/
public Object fetch() {
enter();
if (!(count > 0)) {
notEmpty.await();
assert count > 0;
}
count--;
front = (front + 1) % capacity;
assert count < capacity;
notFull.signal();
leave();
Object value = queue[front];
return value;
}
/**
* Method to push an element in queue.
* @param value the element to be pushed
*/
public void deposit(Object value) {
enter();
if (!(count < capacity)) {
notFull.await();
assert count < capacity;
}
queue[(front + count) % capacity] = value;
count++;
assert count > 0;
notEmpty.signal();
leave();
}
@Override
protected boolean invariant() {
return 0 <= count && count <= capacity && 0 <= front && front < capacity;
}
}

View File

@ -0,0 +1,9 @@
package com.iluwatar.monitor.examples;
/**
* Monitor Example.
*/
public interface VoteInterface {
public boolean castVoteAndWaitForResult(boolean vote);
}

View File

@ -0,0 +1,63 @@
package com.iluwatar.monitor.examples;
import com.iluwatar.monitor.AbstractMonitor;
import com.iluwatar.monitor.Assertion;
import com.iluwatar.monitor.Condition;
import com.iluwatar.monitor.RunnableWithResult;
/**
* Monitor Example.
*/
public class VoteMonitor extends AbstractMonitor implements VoteInterface {
private final int totalVotes;
private int votesFor = 0;
private int votesAgainst = 0;
private Condition electionDone = makeCondition(new Assertion() {
@Override
public boolean isTrue() {
return votesFor + votesAgainst == totalVotes;
}
});
public VoteMonitor(int n) {
assert n > 0;
this.totalVotes = n;
}
@Override
protected boolean invariant() {
return 0 <= votesFor && 0 <= votesAgainst && votesFor + votesAgainst < totalVotes;
}
/**
* Method to cast a vote and wait for the result.
*/
public boolean castVoteAndWaitForResult(final boolean vote) {
return doWithin(new RunnableWithResult<Boolean>() {
public Boolean run() {
if (vote) {
votesFor++;
} else {
votesAgainst++;
}
electionDone.conditionalAwait();
// Assert: votesFor+votesAgainst == N
boolean result = votesFor > votesAgainst;
if (!electionDone.isEmpty()) {
electionDone.signalAndLeave();
} else {
votesFor = votesAgainst = 0;
}
// At this point the thread could be occupying
// the monitor, or not!
return result;
}
});
}
}

View File

@ -0,0 +1,44 @@
package com.iluwatar.monitor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import org.junit.jupiter.api.Test;
/**
* Test Cases for Assertion.
*/
public class AssertionTest {
int varX = 0;
int varY = 0;
@Test
public void testAssertionTrue() {
Assertion a = new MyAssertion();
assertEquals(true, a.isTrue());
}
@Test
public void testAssertionFalse() {
Assertion a = new MyAssertion();
a.check();
varX = 1;
assertEquals(false, a.isTrue());
}
@Test
public void testAssertionError() {
assertThrows(NullPointerException.class, () -> {
Assertion a = new MyAssertion();
a.check();
varX = 1;
a.check();
});
}
class MyAssertion extends Assertion {
public boolean isTrue() {
return varX == varY;
}
}
}

View File

@ -0,0 +1,43 @@
package com.iluwatar.monitor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import com.iluwatar.monitor.examples.VoteInterface;
import com.iluwatar.monitor.examples.VoteMonitor;
/**
* Test Case for Thread Signaling.
*/
public class ThreadSignalTest {
private Voter v0;
private Voter v1;
private Voter v2;
/**
* Setup method for Test Case.
*/
@BeforeAll
public void setUp() {
VoteInterface vm = new VoteMonitor(3);
v0 = new Voter0(vm);
v1 = new Voter1(vm);
v2 = new Voter2(vm);
}
@Test
public void testVotingResult() throws InterruptedException {
v0.start();
v1.start();
v2.start();
v0.join();
v1.join();
v2.join();
assertEquals("Passed", v0.getTestResult());
assertEquals("Passed", v1.getTestResult());
assertEquals("Passed", v2.getTestResult());
}
}

View File

@ -0,0 +1,40 @@
package com.iluwatar.monitor;
import com.iluwatar.monitor.examples.VoteInterface;
abstract class Voter extends Thread {
private VoteInterface vm;
private String testResult;
public String getTestResult() {
return testResult;
}
public void setTestResult(String testResult) {
this.testResult = testResult;
}
Voter(VoteInterface vm) {
this.vm = vm;
}
public void run() {
for (int i = 0; i < 100; ++i) {
boolean vote = makeVote(i);
boolean consensus = vm.castVoteAndWaitForResult(vote);
boolean expected = i % 6 == 1 || i % 6 == 2 || i % 6 == 5;
if (expected != consensus) {
System.out.println("Failed");
setTestResult("Failed");
System.exit(1);
}
setTestResult("Passed");
System.out.println(i + ": " + consensus);
}
System.out.println("Done");
}
abstract boolean makeVote(int i);
}

View File

@ -0,0 +1,13 @@
package com.iluwatar.monitor;
import com.iluwatar.monitor.examples.VoteInterface;
class Voter0 extends Voter {
Voter0(VoteInterface vm) {
super(vm);
}
boolean makeVote(int i) {
return i % 2 == 1;
}
}

View File

@ -0,0 +1,13 @@
package com.iluwatar.monitor;
import com.iluwatar.monitor.examples.VoteInterface;
class Voter1 extends Voter {
Voter1(VoteInterface vm) {
super(vm);
}
boolean makeVote(int i) {
return i % 3 == 2;
}
}

View File

@ -0,0 +1,13 @@
package com.iluwatar.monitor;
import com.iluwatar.monitor.examples.VoteInterface;
class Voter2 extends Voter {
Voter2(VoteInterface vm) {
super(vm);
}
boolean makeVote(int i) {
return i % 3 != 0;
}
}

View File

@ -162,6 +162,7 @@
<module>trampoline</module>
<module>serverless</module>
<module>component-object</module>
<module>monitor-object</module>
</modules>
<repositories>