2015-08-13 23:54:40 +02:00
|
|
|
---
|
|
|
|
layout: pattern
|
|
|
|
title: Poison Pill
|
|
|
|
folder: poison-pill
|
2015-08-15 18:03:05 +02:00
|
|
|
permalink: /patterns/poison-pill/
|
2019-12-13 21:09:28 +02:00
|
|
|
categories: Behavioral
|
2021-05-19 10:49:05 -06:00
|
|
|
language: en
|
2015-12-28 15:52:44 +02:00
|
|
|
tags:
|
2019-12-13 21:09:28 +02:00
|
|
|
- Cloud distributed
|
2016-07-21 09:27:48 +03:00
|
|
|
- Reactive
|
2015-08-13 23:54:40 +02:00
|
|
|
---
|
|
|
|
|
2016-01-03 21:14:30 +01:00
|
|
|
## Intent
|
2020-08-30 19:34:10 +03:00
|
|
|
|
|
|
|
Poison Pill is known predefined data item that allows to provide graceful shutdown for separate
|
|
|
|
distributed consumption process.
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
## Explanation
|
|
|
|
|
|
|
|
Real world example
|
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
> Let's think about a message queue with one producer and one consumer. The producer keeps pushing
|
|
|
|
> new messages in the queue and the consumer keeps reading them. Finally when it's time to
|
|
|
|
> gracefully shut down the producer sends the poison pill message.
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
In plain words
|
|
|
|
|
|
|
|
> Poison Pill is a known message structure that ends the message exchange.
|
|
|
|
|
|
|
|
**Programmatic Example**
|
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
Let's define the message structure first. There's interface `Message` and implementation
|
|
|
|
`SimpleMessage`.
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
```java
|
|
|
|
public interface Message {
|
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
...
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
enum Headers {
|
|
|
|
DATE, SENDER
|
|
|
|
}
|
|
|
|
|
|
|
|
void addHeader(Headers header, String value);
|
|
|
|
|
|
|
|
String getHeader(Headers header);
|
|
|
|
|
|
|
|
Map<Headers, String> getHeaders();
|
|
|
|
|
|
|
|
void setBody(String body);
|
|
|
|
|
|
|
|
String getBody();
|
|
|
|
}
|
|
|
|
|
|
|
|
public class SimpleMessage implements Message {
|
|
|
|
|
2020-07-30 20:28:47 +03:00
|
|
|
private final Map<Headers, String> headers = new HashMap<>();
|
2020-07-19 20:23:12 +03:00
|
|
|
private String body;
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void addHeader(Headers header, String value) {
|
|
|
|
headers.put(header, value);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getHeader(Headers header) {
|
|
|
|
return headers.get(header);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Map<Headers, String> getHeaders() {
|
|
|
|
return Collections.unmodifiableMap(headers);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void setBody(String body) {
|
|
|
|
this.body = body;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public String getBody() {
|
|
|
|
return body;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
To pass messages we are using message queues. Here we define the types related to the message queue:
|
|
|
|
`MqPublishPoint`, `MqSubscribePoint` and `MessageQueue`. `SimpleMessageQueue` implements all these
|
|
|
|
interfaces.
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
```java
|
|
|
|
public interface MqPublishPoint {
|
|
|
|
|
|
|
|
void put(Message msg) throws InterruptedException;
|
|
|
|
}
|
|
|
|
|
|
|
|
public interface MqSubscribePoint {
|
|
|
|
|
|
|
|
Message take() throws InterruptedException;
|
|
|
|
}
|
|
|
|
|
|
|
|
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {
|
|
|
|
}
|
|
|
|
|
|
|
|
public class SimpleMessageQueue implements MessageQueue {
|
|
|
|
|
|
|
|
private final BlockingQueue<Message> queue;
|
|
|
|
|
|
|
|
public SimpleMessageQueue(int bound) {
|
|
|
|
queue = new ArrayBlockingQueue<>(bound);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void put(Message msg) throws InterruptedException {
|
|
|
|
queue.put(msg);
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Message take() throws InterruptedException {
|
|
|
|
return queue.take();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
Next we need message `Producer` and `Consumer`. Internally they use the message queues from above.
|
|
|
|
It's important to notice that when `Producer` stops, it sends out the poison pill to inform
|
|
|
|
`Consumer` that the messaging has finished.
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
```java
|
|
|
|
public class Producer {
|
2020-08-30 19:34:10 +03:00
|
|
|
|
|
|
|
...
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
public void send(String body) {
|
|
|
|
if (isStopped) {
|
|
|
|
throw new IllegalStateException(String.format(
|
|
|
|
"Producer %s was stopped and fail to deliver requested message [%s].", body, name));
|
|
|
|
}
|
|
|
|
var msg = new SimpleMessage();
|
|
|
|
msg.addHeader(Headers.DATE, new Date().toString());
|
|
|
|
msg.addHeader(Headers.SENDER, name);
|
|
|
|
msg.setBody(body);
|
|
|
|
|
|
|
|
try {
|
|
|
|
queue.put(msg);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// allow thread to exit
|
|
|
|
LOGGER.error("Exception caught.", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public void stop() {
|
|
|
|
isStopped = true;
|
|
|
|
try {
|
|
|
|
queue.put(Message.POISON_PILL);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// allow thread to exit
|
|
|
|
LOGGER.error("Exception caught.", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
public class Consumer {
|
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
...
|
2020-07-19 20:23:12 +03:00
|
|
|
|
|
|
|
public void consume() {
|
|
|
|
while (true) {
|
|
|
|
try {
|
|
|
|
var msg = queue.take();
|
|
|
|
if (Message.POISON_PILL.equals(msg)) {
|
|
|
|
LOGGER.info("Consumer {} receive request to terminate.", name);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
var sender = msg.getHeader(Headers.SENDER);
|
|
|
|
var body = msg.getBody();
|
|
|
|
LOGGER.info("Message [{}] from [{}] received by [{}]", body, sender, name);
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
// allow thread to exit
|
|
|
|
LOGGER.error("Exception caught.", e);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
```
|
|
|
|
|
|
|
|
Finally we are ready to present the whole example in action.
|
|
|
|
|
|
|
|
```java
|
|
|
|
var queue = new SimpleMessageQueue(10000);
|
|
|
|
|
|
|
|
final var producer = new Producer("PRODUCER_1", queue);
|
|
|
|
final var consumer = new Consumer("CONSUMER_1", queue);
|
|
|
|
|
|
|
|
new Thread(consumer::consume).start();
|
|
|
|
|
|
|
|
new Thread(() -> {
|
|
|
|
producer.send("hand shake");
|
|
|
|
producer.send("some very important information");
|
|
|
|
producer.send("bye!");
|
|
|
|
producer.stop();
|
|
|
|
}).start();
|
|
|
|
```
|
2015-08-13 23:54:40 +02:00
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
Program output:
|
|
|
|
|
|
|
|
```
|
|
|
|
Message [hand shake] from [PRODUCER_1] received by [CONSUMER_1]
|
|
|
|
Message [some very important information] from [PRODUCER_1] received by [CONSUMER_1]
|
|
|
|
Message [bye!] from [PRODUCER_1] received by [CONSUMER_1]
|
|
|
|
Consumer CONSUMER_1 receive request to terminate.
|
|
|
|
```
|
|
|
|
|
2019-12-07 20:01:13 +02:00
|
|
|
## Class diagram
|
2020-08-30 19:34:10 +03:00
|
|
|
|
2015-08-13 23:54:40 +02:00
|
|
|

|
|
|
|
|
2016-01-03 21:14:30 +01:00
|
|
|
## Applicability
|
2015-08-13 23:54:40 +02:00
|
|
|
|
2020-08-30 19:34:10 +03:00
|
|
|
Use the Poison Pill idiom when:
|
|
|
|
|
|
|
|
* There's a need to send signal from one thread/process to another to terminate.
|
2015-08-13 23:54:40 +02:00
|
|
|
|
2016-01-03 21:14:30 +01:00
|
|
|
## Real world examples
|
2015-08-13 23:54:40 +02:00
|
|
|
|
2015-08-15 18:03:05 +02:00
|
|
|
* [akka.actor.PoisonPill](http://doc.akka.io/docs/akka/2.1.4/java/untyped-actors.html)
|