#271 implements producer-consumer

This commit is contained in:
hongsw
2015-10-28 09:55:05 +08:00
parent a2b8359ab5
commit 07faa2f625
11 changed files with 299 additions and 0 deletions

View File

@ -0,0 +1,57 @@
package com.iluwatar.producer.consumer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Producer Consumer Design pattern is a classic concurrency or threading pattern which reduces
* coupling between Producer and Consumer by separating Identification of work with Execution of
* Work.
* <p>
* In producer consumer design pattern a shared queue is used to control the flow and this
* separation allows you to code producer and consumer separately. It also addresses the issue of
* different timing require to produce item or consuming item. by using producer consumer pattern
* both Producer and Consumer Thread can work with different speed.
*
*/
public class App {
/**
* Program entry point
*
* @param args command line args
*/
public static void main(String[] args) {
ItemQueue queue = new ItemQueue();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 2; i++) {
final Producer producer = new Producer("Producer_" + i, queue);
executorService.submit(() -> {
while (true) {
producer.produce();
}
});
};
for (int i = 0; i < 3; i++) {
final Consumer consumer = new Consumer("Consumer_" + i, queue);
executorService.submit(() -> {
while (true) {
consumer.consume();
}
});
}
executorService.shutdown();
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
executorService.shutdownNow();
} catch (InterruptedException e) {
System.out.println("Error waiting for ExecutorService shutdown");
}
}
}

View File

@ -0,0 +1,24 @@
package com.iluwatar.producer.consumer;
/**
* Class responsible for consume the {@link Item} produced by {@link Producer}
*/
public class Consumer {
private final ItemQueue queue;
private final String name;
public Consumer(String name, ItemQueue queue) {
this.name = name;
this.queue = queue;
}
public void consume() throws InterruptedException {
Item item = queue.take();
System.out.println(String.format("Consumer [%s] consume item [%s] produced by [%s]", name,
item.getId(), item.getProducer()));
}
}

View File

@ -0,0 +1,27 @@
package com.iluwatar.producer.consumer;
/**
* Class take part of an {@link Producer}-{@link Consumer} exchange.
*/
public class Item {
private String producer;
private int id;
public Item(String producer, int id) {
this.id = id;
this.producer = producer;
}
public int getId() {
return id;
}
public String getProducer() {
return producer;
}
}

View File

@ -0,0 +1,27 @@
package com.iluwatar.producer.consumer;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Class as a channel for {@link Producer}-{@link Consumer} exchange.
*/
public class ItemQueue {
private LinkedBlockingQueue<Item> queue;
public ItemQueue() {
queue = new LinkedBlockingQueue<Item>(5);
}
public void put(Item item) throws InterruptedException {
queue.put(item);
}
public Item take() throws InterruptedException {
return queue.take();
}
}

View File

@ -0,0 +1,29 @@
package com.iluwatar.producer.consumer;
import java.util.Random;
/**
* Class responsible for producing unit of work that can be expressed as {@link Item} and submitted
* to queue
*/
public class Producer {
private final ItemQueue queue;
private final String name;
private int itemId = 0;
public Producer(String name, ItemQueue queue) {
this.name = name;
this.queue = queue;
}
public void produce() throws InterruptedException {
Item item = new Item(name, itemId++);
queue.put(item);
Random random = new Random();
Thread.sleep(random.nextInt(2000));
}
}