added Poison Pill idiom

This commit is contained in:
vehpsr
2015-04-05 18:03:16 +03:00
parent 6366041781
commit dc0f578f8b
15 changed files with 398 additions and 1 deletions

View File

@ -0,0 +1,37 @@
package com.iluwatar;
/**
* One of possible approaches to terminate Producer-Consumer pattern is using PoisonPill idiom.
* If you use PoisonPill as termination signal then Producer is responsible to notify Consumer that exchange is over
* and reject any further messages. Consumer receiving PoisonPill will stop to read messages from queue.
* You also must ensure that PoisonPill will be last message that will be read from queue (if you have
* prioritized queue than this can be tricky).
* In simple cases as PoisonPill can be used just null-reference, but holding unique separate shared
* object-marker (with name "Poison" or "PoisonPill") is more clear and self describing.
*/
public class App {
public static void main(String[] args) {
MessageQueue queue = new SimpleMessageQueue(10000);
final Producer producer = new Producer("PRODUCER_1", queue);
final Consumer consumer = new Consumer("CONSUMER_1", queue);
new Thread() {
@Override
public void run() {
consumer.consume();
}
}.start();
new Thread() {
@Override
public void run() {
producer.send("hand shake");
producer.send("some very important information");
producer.send("bye!");
producer.stop();
}
}.start();
}
}

View File

@ -0,0 +1,38 @@
package com.iluwatar;
import com.iluwatar.Message.Headers;
/**
* Class responsible for receiving and handling submitted to the queue messages
*/
public class Consumer {
private final MQSubscribePoint queue;
private final String name;
public Consumer(String name, MQSubscribePoint queue) {
this.name = name;
this.queue = queue;
}
public void consume() {
while (true) {
Message msg;
try {
msg = queue.take();
if (msg == Message.POISON_PILL) {
System.out.println(String.format("Consumer %s receive request to terminate.", name));
break;
}
} catch (InterruptedException e) {
// allow thread to exit
System.err.println(e);
return;
}
String sender = msg.getHeader(Headers.SENDER);
String body = msg.getBody();
System.out.println(String.format("Message [%s] from [%s] received by [%s]", body, sender, name));
}
}
}

View File

@ -0,0 +1,9 @@
package com.iluwatar;
/**
* Endpoint to publish {@link Message} to queue
*/
public interface MQPublishPoint {
public void put(Message msg) throws InterruptedException;
}

View File

@ -0,0 +1,9 @@
package com.iluwatar;
/**
* Endpoint to retrieve {@link Message} from queue
*/
public interface MQSubscribePoint {
public Message take() throws InterruptedException;
}

View File

@ -0,0 +1,52 @@
package com.iluwatar;
import java.util.Map;
/**
* Interface that implements the Message pattern and represents an inbound or outbound message as part of an {@link Producer}-{@link Consumer} exchange.
*/
public interface Message {
public static final Message POISON_PILL = new Message() {
@Override
public void addHeader(Headers header, String value) {
throw poison();
}
@Override
public String getHeader(Headers header) {
throw poison();
}
@Override
public Map<Headers, String> getHeaders() {
throw poison();
}
@Override
public void setBody(String body) {
throw poison();
}
@Override
public String getBody() {
throw poison();
}
private RuntimeException poison() {
return new UnsupportedOperationException("Poison");
}
};
public enum Headers {
DATE, SENDER
}
public void addHeader(Headers header, String value);
public String getHeader(Headers header);
public Map<Headers, String> getHeaders();
public void setBody(String body);
public String getBody();
}

View File

@ -0,0 +1,8 @@
package com.iluwatar;
/**
* Represents abstraction of channel (or pipe) that bounds {@link Producer} and {@link Consumer}
*/
public interface MessageQueue extends MQPublishPoint, MQSubscribePoint {
}

View File

@ -0,0 +1,48 @@
package com.iluwatar;
import java.util.Date;
import com.iluwatar.Message.Headers;
/**
* Class responsible for producing unit of work that can be expressed as message and submitted to queue
*/
public class Producer {
private final MQPublishPoint queue;
private final String name;
private boolean isStopped;
public Producer(String name, MQPublishPoint queue) {
this.name = name;
this.queue = queue;
this.isStopped = false;
}
public void send(String body) {
if (isStopped) {
throw new IllegalStateException(String.format("Producer %s was stopped and fail to deliver requested message [%s].", body, name));
}
Message msg = new SimpleMessage();
msg.addHeader(Headers.DATE, new Date().toString());
msg.addHeader(Headers.SENDER, name);
msg.setBody(body);
try {
queue.put(msg);
} catch (InterruptedException e) {
// allow thread to exit
System.err.println(e);
}
}
public void stop() {
isStopped = true;
try {
queue.put(Message.POISON_PILL);
} catch (InterruptedException e) {
// allow thread to exit
System.err.println(e);
}
}
}

View File

@ -0,0 +1,39 @@
package com.iluwatar;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* {@link Message} basic implementation
*/
public class SimpleMessage implements Message {
private Map<Headers, String> headers = new HashMap<>();
private String body;
@Override
public void addHeader(Headers header, String value) {
headers.put(header, value);
}
@Override
public String getHeader(Headers header) {
return headers.get(header);
}
@Override
public Map<Headers, String> getHeaders() {
return Collections.unmodifiableMap(headers);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}

View File

@ -0,0 +1,27 @@
package com.iluwatar;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* Bounded blocking queue wrapper
*/
public class SimpleMessageQueue implements MessageQueue {
private final BlockingQueue<Message> queue;
public SimpleMessageQueue(int bound) {
queue = new ArrayBlockingQueue<Message>(bound);
}
@Override
public void put(Message msg) throws InterruptedException {
queue.put(msg);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}

View File

@ -0,0 +1,12 @@
package com.iluwatar;
import org.junit.Test;
public class AppTest {
@Test
public void test() {
String[] args = {};
App.main(args);
}
}