228 lines
5.1 KiB
Markdown
Raw Permalink Normal View History

---
layout: pattern
title: Poison Pill
folder: poison-pill
permalink: /patterns/poison-pill/
categories: Behavioral
language: en
2015-12-28 15:52:44 +02:00
tags:
- Cloud distributed
2016-07-21 09:27:48 +03:00
- Reactive
---
## Intent
2020-08-30 19:34:10 +03:00
Poison Pill is known predefined data item that allows to provide graceful shutdown for separate
distributed consumption process.
2020-07-19 20:23:12 +03:00
## Explanation
Real world example
2020-08-30 19:34:10 +03:00
> Let's think about a message queue with one producer and one consumer. The producer keeps pushing
> new messages in the queue and the consumer keeps reading them. Finally when it's time to
> gracefully shut down the producer sends the poison pill message.
2020-07-19 20:23:12 +03:00
In plain words
> Poison Pill is a known message structure that ends the message exchange.
**Programmatic Example**
2020-08-30 19:34:10 +03:00
Let's define the message structure first. There's interface `Message` and implementation
`SimpleMessage`.
2020-07-19 20:23:12 +03:00
```java
public interface Message {
2020-08-30 19:34:10 +03:00
...
2020-07-19 20:23:12 +03:00
enum Headers {
DATE, SENDER
}
void addHeader(Headers header, String value);
String getHeader(Headers header);
Map<Headers, String> getHeaders();
void setBody(String body);
String getBody();
}
public class SimpleMessage implements Message {
private final Map<Headers, String> headers = new HashMap<>();
2020-07-19 20:23:12 +03:00
private String body;
@Override
public void addHeader(Headers header, String value) {
headers.put(header, value);
}
@Override
public String getHeader(Headers header) {
return headers.get(header);
}
@Override
public Map<Headers, String> getHeaders() {
return Collections.unmodifiableMap(headers);
}
@Override
public void setBody(String body) {
this.body = body;
}
@Override
public String getBody() {
return body;
}
}
```
2020-08-30 19:34:10 +03:00
To pass messages we are using message queues. Here we define the types related to the message queue:
`MqPublishPoint`, `MqSubscribePoint` and `MessageQueue`. `SimpleMessageQueue` implements all these
interfaces.
2020-07-19 20:23:12 +03:00
```java
public interface MqPublishPoint {
void put(Message msg) throws InterruptedException;
}
public interface MqSubscribePoint {
Message take() throws InterruptedException;
}
public interface MessageQueue extends MqPublishPoint, MqSubscribePoint {
}
public class SimpleMessageQueue implements MessageQueue {
private final BlockingQueue<Message> queue;
public SimpleMessageQueue(int bound) {
queue = new ArrayBlockingQueue<>(bound);
}
@Override
public void put(Message msg) throws InterruptedException {
queue.put(msg);
}
@Override
public Message take() throws InterruptedException {
return queue.take();
}
}
```
2020-08-30 19:34:10 +03:00
Next we need message `Producer` and `Consumer`. Internally they use the message queues from above.
It's important to notice that when `Producer` stops, it sends out the poison pill to inform
`Consumer` that the messaging has finished.
2020-07-19 20:23:12 +03:00
```java
public class Producer {
2020-08-30 19:34:10 +03:00
...
2020-07-19 20:23:12 +03:00
public void send(String body) {
if (isStopped) {
throw new IllegalStateException(String.format(
"Producer %s was stopped and fail to deliver requested message [%s].", body, name));
}
var msg = new SimpleMessage();
msg.addHeader(Headers.DATE, new Date().toString());
msg.addHeader(Headers.SENDER, name);
msg.setBody(body);
try {
queue.put(msg);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
public void stop() {
isStopped = true;
try {
queue.put(Message.POISON_PILL);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
}
}
}
public class Consumer {
2020-08-30 19:34:10 +03:00
...
2020-07-19 20:23:12 +03:00
public void consume() {
while (true) {
try {
var msg = queue.take();
if (Message.POISON_PILL.equals(msg)) {
LOGGER.info("Consumer {} receive request to terminate.", name);
break;
}
var sender = msg.getHeader(Headers.SENDER);
var body = msg.getBody();
LOGGER.info("Message [{}] from [{}] received by [{}]", body, sender, name);
} catch (InterruptedException e) {
// allow thread to exit
LOGGER.error("Exception caught.", e);
return;
}
}
}
}
```
Finally we are ready to present the whole example in action.
```java
var queue = new SimpleMessageQueue(10000);
final var producer = new Producer("PRODUCER_1", queue);
final var consumer = new Consumer("CONSUMER_1", queue);
new Thread(consumer::consume).start();
new Thread(() -> {
producer.send("hand shake");
producer.send("some very important information");
producer.send("bye!");
producer.stop();
}).start();
```
2020-08-30 19:34:10 +03:00
Program output:
```
Message [hand shake] from [PRODUCER_1] received by [CONSUMER_1]
Message [some very important information] from [PRODUCER_1] received by [CONSUMER_1]
Message [bye!] from [PRODUCER_1] received by [CONSUMER_1]
Consumer CONSUMER_1 receive request to terminate.
```
## Class diagram
2020-08-30 19:34:10 +03:00
![alt text](./etc/poison-pill.png "Poison Pill")
## Applicability
2020-08-30 19:34:10 +03:00
Use the Poison Pill idiom when:
* There's a need to send signal from one thread/process to another to terminate.
## Real world examples
* [akka.actor.PoisonPill](http://doc.akka.io/docs/akka/2.1.4/java/untyped-actors.html)