Merge pull request #281 from ankurkaushal/master
Reformat according to google style guide
This commit is contained in:
@ -1,43 +1,45 @@
|
||||
package com.iluwatar.poison.pill;
|
||||
|
||||
/**
|
||||
* One of the possible approaches to terminate Producer-Consumer pattern is using the Poison Pill idiom.
|
||||
* If you use Poison Pill as the termination signal then Producer is responsible to notify Consumer that
|
||||
* the exchange is over and reject any further messages. The Consumer receiving Poison Pill will stop
|
||||
* reading messages from the queue. You must also ensure that the Poison Pill will be the last message
|
||||
* that will be read from the queue (if you have prioritized queue then this can be tricky).
|
||||
* One of the possible approaches to terminate Producer-Consumer pattern is using the Poison Pill
|
||||
* idiom. If you use Poison Pill as the termination signal then Producer is responsible to notify
|
||||
* Consumer that the exchange is over and reject any further messages. The Consumer receiving Poison
|
||||
* Pill will stop reading messages from the queue. You must also ensure that the Poison Pill will be
|
||||
* the last message that will be read from the queue (if you have prioritized queue then this can be
|
||||
* tricky).
|
||||
* <p>
|
||||
* In simple cases the Poison Pill can be just a null-reference, but holding a unique separate shared
|
||||
* object-marker (with name "Poison" or "Poison Pill") is more clear and self describing.
|
||||
* In simple cases the Poison Pill can be just a null-reference, but holding a unique separate
|
||||
* shared object-marker (with name "Poison" or "Poison Pill") is more clear and self describing.
|
||||
*
|
||||
*/
|
||||
public class App {
|
||||
|
||||
/**
|
||||
* Program entry point
|
||||
* @param args command line args
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
MessageQueue queue = new SimpleMessageQueue(10000);
|
||||
/**
|
||||
* Program entry point
|
||||
*
|
||||
* @param args command line args
|
||||
*/
|
||||
public static void main(String[] args) {
|
||||
MessageQueue queue = new SimpleMessageQueue(10000);
|
||||
|
||||
final Producer producer = new Producer("PRODUCER_1", queue);
|
||||
final Consumer consumer = new Consumer("CONSUMER_1", queue);
|
||||
final Producer producer = new Producer("PRODUCER_1", queue);
|
||||
final Consumer consumer = new Consumer("CONSUMER_1", queue);
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumer.consume();
|
||||
}
|
||||
}.start();
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
consumer.consume();
|
||||
}
|
||||
}.start();
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
producer.send("hand shake");
|
||||
producer.send("some very important information");
|
||||
producer.send("bye!");
|
||||
producer.stop();
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
producer.send("hand shake");
|
||||
producer.send("some very important information");
|
||||
producer.send("bye!");
|
||||
producer.stop();
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
}
|
||||
|
@ -7,32 +7,33 @@ import com.iluwatar.poison.pill.Message.Headers;
|
||||
*/
|
||||
public class Consumer {
|
||||
|
||||
private final MQSubscribePoint queue;
|
||||
private final String name;
|
||||
private final MQSubscribePoint queue;
|
||||
private final String name;
|
||||
|
||||
public Consumer(String name, MQSubscribePoint queue) {
|
||||
this.name = name;
|
||||
this.queue = queue;
|
||||
}
|
||||
public Consumer(String name, MQSubscribePoint queue) {
|
||||
this.name = name;
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
public void consume() {
|
||||
while (true) {
|
||||
Message msg;
|
||||
try {
|
||||
msg = queue.take();
|
||||
if (msg == Message.POISON_PILL) {
|
||||
System.out.println(String.format("Consumer %s receive request to terminate.", name));
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// allow thread to exit
|
||||
System.err.println(e);
|
||||
return;
|
||||
}
|
||||
public void consume() {
|
||||
while (true) {
|
||||
Message msg;
|
||||
try {
|
||||
msg = queue.take();
|
||||
if (msg == Message.POISON_PILL) {
|
||||
System.out.println(String.format("Consumer %s receive request to terminate.", name));
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// allow thread to exit
|
||||
System.err.println(e);
|
||||
return;
|
||||
}
|
||||
|
||||
String sender = msg.getHeader(Headers.SENDER);
|
||||
String body = msg.getBody();
|
||||
System.out.println(String.format("Message [%s] from [%s] received by [%s]", body, sender, name));
|
||||
}
|
||||
}
|
||||
String sender = msg.getHeader(Headers.SENDER);
|
||||
String body = msg.getBody();
|
||||
System.out.println(String.format("Message [%s] from [%s] received by [%s]", body, sender,
|
||||
name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5,5 +5,5 @@ package com.iluwatar.poison.pill;
|
||||
*/
|
||||
public interface MQPublishPoint {
|
||||
|
||||
public void put(Message msg) throws InterruptedException;
|
||||
public void put(Message msg) throws InterruptedException;
|
||||
}
|
||||
|
@ -5,5 +5,5 @@ package com.iluwatar.poison.pill;
|
||||
*/
|
||||
public interface MQSubscribePoint {
|
||||
|
||||
public Message take() throws InterruptedException;
|
||||
public Message take() throws InterruptedException;
|
||||
}
|
||||
|
@ -3,51 +3,55 @@ package com.iluwatar.poison.pill;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interface that implements the Message pattern and represents an inbound or outbound
|
||||
* message as part of an {@link Producer}-{@link Consumer} exchange.
|
||||
* Interface that implements the Message pattern and represents an inbound or outbound message as
|
||||
* part of an {@link Producer}-{@link Consumer} exchange.
|
||||
*/
|
||||
public interface Message {
|
||||
|
||||
public static final Message POISON_PILL = new Message() {
|
||||
public static final Message POISON_PILL = new Message() {
|
||||
|
||||
@Override
|
||||
public void addHeader(Headers header, String value) {
|
||||
throw poison();
|
||||
}
|
||||
@Override
|
||||
public void addHeader(Headers header, String value) {
|
||||
throw poison();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHeader(Headers header) {
|
||||
throw poison();
|
||||
}
|
||||
@Override
|
||||
public String getHeader(Headers header) {
|
||||
throw poison();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Headers, String> getHeaders() {
|
||||
throw poison();
|
||||
}
|
||||
@Override
|
||||
public Map<Headers, String> getHeaders() {
|
||||
throw poison();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBody(String body) {
|
||||
throw poison();
|
||||
}
|
||||
@Override
|
||||
public void setBody(String body) {
|
||||
throw poison();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBody() {
|
||||
throw poison();
|
||||
}
|
||||
@Override
|
||||
public String getBody() {
|
||||
throw poison();
|
||||
}
|
||||
|
||||
private RuntimeException poison() {
|
||||
return new UnsupportedOperationException("Poison");
|
||||
}
|
||||
private RuntimeException poison() {
|
||||
return new UnsupportedOperationException("Poison");
|
||||
}
|
||||
|
||||
};
|
||||
};
|
||||
|
||||
public enum Headers {
|
||||
DATE, SENDER
|
||||
}
|
||||
public enum Headers {
|
||||
DATE, SENDER
|
||||
}
|
||||
|
||||
public void addHeader(Headers header, String value);
|
||||
public String getHeader(Headers header);
|
||||
public Map<Headers, String> getHeaders();
|
||||
public void setBody(String body);
|
||||
public String getBody();
|
||||
public void addHeader(Headers header, String value);
|
||||
|
||||
public String getHeader(Headers header);
|
||||
|
||||
public Map<Headers, String> getHeaders();
|
||||
|
||||
public void setBody(String body);
|
||||
|
||||
public String getBody();
|
||||
}
|
||||
|
@ -5,44 +5,46 @@ import java.util.Date;
|
||||
import com.iluwatar.poison.pill.Message.Headers;
|
||||
|
||||
/**
|
||||
* Class responsible for producing unit of work that can be expressed as message and submitted to queue
|
||||
* Class responsible for producing unit of work that can be expressed as message and submitted to
|
||||
* queue
|
||||
*/
|
||||
public class Producer {
|
||||
|
||||
private final MQPublishPoint queue;
|
||||
private final String name;
|
||||
private boolean isStopped;
|
||||
private final MQPublishPoint queue;
|
||||
private final String name;
|
||||
private boolean isStopped;
|
||||
|
||||
public Producer(String name, MQPublishPoint queue) {
|
||||
this.name = name;
|
||||
this.queue = queue;
|
||||
this.isStopped = false;
|
||||
}
|
||||
public Producer(String name, MQPublishPoint queue) {
|
||||
this.name = name;
|
||||
this.queue = queue;
|
||||
this.isStopped = false;
|
||||
}
|
||||
|
||||
public void send(String body) {
|
||||
if (isStopped) {
|
||||
throw new IllegalStateException(String.format("Producer %s was stopped and fail to deliver requested message [%s].", body, name));
|
||||
}
|
||||
Message msg = new SimpleMessage();
|
||||
msg.addHeader(Headers.DATE, new Date().toString());
|
||||
msg.addHeader(Headers.SENDER, name);
|
||||
msg.setBody(body);
|
||||
public void send(String body) {
|
||||
if (isStopped) {
|
||||
throw new IllegalStateException(String.format(
|
||||
"Producer %s was stopped and fail to deliver requested message [%s].", body, name));
|
||||
}
|
||||
Message msg = new SimpleMessage();
|
||||
msg.addHeader(Headers.DATE, new Date().toString());
|
||||
msg.addHeader(Headers.SENDER, name);
|
||||
msg.setBody(body);
|
||||
|
||||
try {
|
||||
queue.put(msg);
|
||||
} catch (InterruptedException e) {
|
||||
// allow thread to exit
|
||||
System.err.println(e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
queue.put(msg);
|
||||
} catch (InterruptedException e) {
|
||||
// allow thread to exit
|
||||
System.err.println(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
isStopped = true;
|
||||
try {
|
||||
queue.put(Message.POISON_PILL);
|
||||
} catch (InterruptedException e) {
|
||||
// allow thread to exit
|
||||
System.err.println(e);
|
||||
}
|
||||
}
|
||||
public void stop() {
|
||||
isStopped = true;
|
||||
try {
|
||||
queue.put(Message.POISON_PILL);
|
||||
} catch (InterruptedException e) {
|
||||
// allow thread to exit
|
||||
System.err.println(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -9,31 +9,31 @@ import java.util.Map;
|
||||
*/
|
||||
public class SimpleMessage implements Message {
|
||||
|
||||
private Map<Headers, String> headers = new HashMap<>();
|
||||
private String body;
|
||||
private Map<Headers, String> headers = new HashMap<>();
|
||||
private String body;
|
||||
|
||||
@Override
|
||||
public void addHeader(Headers header, String value) {
|
||||
headers.put(header, value);
|
||||
}
|
||||
@Override
|
||||
public void addHeader(Headers header, String value) {
|
||||
headers.put(header, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHeader(Headers header) {
|
||||
return headers.get(header);
|
||||
}
|
||||
@Override
|
||||
public String getHeader(Headers header) {
|
||||
return headers.get(header);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Headers, String> getHeaders() {
|
||||
return Collections.unmodifiableMap(headers);
|
||||
}
|
||||
@Override
|
||||
public Map<Headers, String> getHeaders() {
|
||||
return Collections.unmodifiableMap(headers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBody(String body) {
|
||||
this.body = body;
|
||||
}
|
||||
@Override
|
||||
public void setBody(String body) {
|
||||
this.body = body;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getBody() {
|
||||
return body;
|
||||
}
|
||||
@Override
|
||||
public String getBody() {
|
||||
return body;
|
||||
}
|
||||
}
|
||||
|
@ -8,20 +8,19 @@ import java.util.concurrent.BlockingQueue;
|
||||
*/
|
||||
public class SimpleMessageQueue implements MessageQueue {
|
||||
|
||||
private final BlockingQueue<Message> queue;
|
||||
private final BlockingQueue<Message> queue;
|
||||
|
||||
public SimpleMessageQueue(int bound) {
|
||||
queue = new ArrayBlockingQueue<Message>(bound);
|
||||
}
|
||||
public SimpleMessageQueue(int bound) {
|
||||
queue = new ArrayBlockingQueue<Message>(bound);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(Message msg) throws InterruptedException {
|
||||
queue.put(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message take() throws InterruptedException {
|
||||
return queue.take();
|
||||
}
|
||||
@Override
|
||||
public void put(Message msg) throws InterruptedException {
|
||||
queue.put(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message take() throws InterruptedException {
|
||||
return queue.take();
|
||||
}
|
||||
}
|
||||
|
@ -11,9 +11,9 @@ import com.iluwatar.poison.pill.App;
|
||||
*/
|
||||
public class AppTest {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
String[] args = {};
|
||||
App.main(args);
|
||||
}
|
||||
@Test
|
||||
public void test() {
|
||||
String[] args = {};
|
||||
App.main(args);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user