Fix issue #179: Leader Followers Pattern (#1189)

* add leader followers pattern

* use var and streams instead in App::execute

* use logger instead of printing to system output stream
This commit is contained in:
Zhang WH
2020-03-27 03:14:44 +08:00
committed by GitHub
parent 6ce33ed6df
commit be1c0b8143
15 changed files with 737 additions and 0 deletions

View File

@ -0,0 +1,93 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Leader/Followers pattern is a concurrency pattern. This pattern behaves like a taxi stand where
* one of the threads acts as leader thread which listens for event from event sources,
* de-multiplexes, dispatches and handles the event. It promotes the follower to be the new leader.
* When processing completes the thread joins the followers queue, if there are no followers then it
* becomes the leader and cycle repeats again.
*
* <p>In this example, one of the workers becomes Leader and listens on the {@link TaskSet} for
* work. {@link TaskSet} basically acts as the source of input events for the {@link Worker}, who
* are spawned and controlled by the {@link WorkCenter} . When {@link Task} arrives then the leader
* takes the work and calls the {@link TaskHandler}. It also calls the {@link WorkCenter} to
* promotes one of the followers to be the new leader, who can then process the next work and so
* on.
*
* <p>The pros for this pattern are:
* It enhances CPU cache affinity and eliminates unbound allocation and data buffer sharing between
* threads by reading the request into buffer space allocated on the stack of the leader or by using
* the Thread-Specific Storage pattern [22] to allocate memory. It minimizes locking overhead by not
* exchanging data between threads, thereby reducing thread synchronization. In bound handle/thread
* associations, the leader thread dispatches the event based on the I/O handle. It can minimize
* priority inversion because no extra queuing is introduced in the server. It does not require a
* context switch to handle each event, reducing the event dispatching latency. Note that promoting
* a follower thread to fulfill the leader role requires a context switch. Programming simplicity:
* The Leader/Followers pattern simplifies the programming of concurrency models where multiple
* threads can receive requests, process responses, and de-multiplex connections using a shared
* handle set.
*/
public class App {
/**
* The main method for the leader followers pattern.
*/
public static void main(String[] args) throws InterruptedException {
var taskSet = new TaskSet();
var taskHandler = new TaskHandler();
var workCenter = new WorkCenter();
workCenter.createWorkers(4, taskSet, taskHandler);
execute(workCenter, taskSet);
}
/**
* Start the work, dispatch tasks and stop the thread pool at last.
*/
private static void execute(WorkCenter workCenter, TaskSet taskSet) throws InterruptedException {
var workers = workCenter.getWorkers();
var exec = Executors.newFixedThreadPool(workers.size());
workers.forEach(exec::submit);
Thread.sleep(1000);
addTasks(taskSet);
exec.awaitTermination(2, TimeUnit.SECONDS);
exec.shutdownNow();
}
/**
* Add tasks.
*/
private static void addTasks(TaskSet taskSet) throws InterruptedException {
var rand = new Random();
for (var i = 0; i < 5; i++) {
var time = Math.abs(rand.nextInt(1000));
taskSet.addTask(new Task(time));
}
}
}

View File

@ -0,0 +1,51 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
/**
* A unit of work to be processed by the Workers.
*/
public class Task {
private final int time;
private boolean finished;
public Task(int time) {
this.time = time;
}
public int getTime() {
return time;
}
public void setFinished() {
this.finished = true;
}
public boolean isFinished() {
return this.finished;
}
}

View File

@ -0,0 +1,46 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The TaskHandler is used by the {@link Worker} to process the newly arrived task.
*/
public class TaskHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TaskHandler.class);
/**
* This interface handles one task at a time.
*/
public void handleTask(Task task) throws InterruptedException {
var time = task.getTime();
Thread.sleep(time);
LOGGER.info("It takes " + time + " milliseconds to finish the task");
task.setFinished();
}
}

View File

@ -0,0 +1,47 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* A TaskSet is a collection of the tasks, the leader receives task from here.
*/
public class TaskSet {
private BlockingQueue<Task> queue = new ArrayBlockingQueue<>(100);
public void addTask(Task task) throws InterruptedException {
queue.put(task);
}
public Task getTask() throws InterruptedException {
return queue.take();
}
public int getSize() {
return queue.size();
}
}

View File

@ -0,0 +1,76 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* A WorkCenter contains a leader and a list of idle workers. The leader is responsible for
* receiving work when it arrives. This class also provides a mechanism to promote a new leader. A
* worker once he completes his task will add himself back to the center.
*/
public class WorkCenter {
private Worker leader;
private List<Worker> workers = new CopyOnWriteArrayList<>();
/**
* Create workers and set leader.
*/
public void createWorkers(int numberOfWorkers, TaskSet taskSet, TaskHandler taskHandler) {
for (var id = 1; id <= numberOfWorkers; id++) {
var worker = new Worker(id, this, taskSet, taskHandler);
workers.add(worker);
}
promoteLeader();
}
public void addWorker(Worker worker) {
workers.add(worker);
}
public void removeWorker(Worker worker) {
workers.remove(worker);
}
public Worker getLeader() {
return leader;
}
/**
* Promote a leader.
*/
public void promoteLeader() {
Worker leader = null;
if (workers.size() > 0) {
leader = workers.get(0);
}
this.leader = leader;
}
public List<Worker> getWorkers() {
return workers;
}
}

View File

@ -0,0 +1,96 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Worker implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
private final long id;
private final WorkCenter workCenter;
private final TaskSet taskSet;
private final TaskHandler taskHandler;
/**
* Constructor to create a worker which will take work from the work center.
*/
public Worker(long id, WorkCenter workCenter, TaskSet taskSet, TaskHandler taskHandler) {
super();
this.id = id;
this.workCenter = workCenter;
this.taskSet = taskSet;
this.taskHandler = taskHandler;
}
/**
* The leader thread listens for task. When task arrives, it promotes one of the followers to be
* the new leader. Then it handles the task and add himself back to work center.
*/
@Override
public void run() {
while (!Thread.interrupted()) {
try {
if (workCenter.getLeader() != null && !workCenter.getLeader().equals(this)) {
synchronized (workCenter) {
workCenter.wait();
}
continue;
}
final Task task = taskSet.getTask();
synchronized (workCenter) {
workCenter.removeWorker(this);
workCenter.promoteLeader();
workCenter.notifyAll();
}
taskHandler.handleTask(task);
LOGGER.info("The Worker with the ID " + id + " completed the task");
workCenter.addWorker(this);
} catch (InterruptedException e) {
LOGGER.warn("Worker interrupted");
return;
}
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof Worker)) {
return false;
}
var worker = (Worker) o;
return id == worker.id;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}

View File

@ -0,0 +1,41 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import org.junit.Test;
/**
*
* Application test
*
*/
public class AppTest {
@Test
public void test() throws InterruptedException {
String[] args = {};
App.main(args);
}
}

View File

@ -0,0 +1,42 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for TaskHandler
*/
public class TaskHandlerTest {
@Test
public void testHandleTask() throws InterruptedException {
var taskHandler = new TaskHandler();
var handle = new Task(100);
taskHandler.handleTask(handle);
Assert.assertTrue(handle.isFinished());
}
}

View File

@ -0,0 +1,50 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for TaskSet
*/
public class TaskSetTest {
@Test
public void testAddTask() throws InterruptedException {
var taskSet = new TaskSet();
taskSet.addTask(new Task(10));
Assert.assertTrue(taskSet.getSize() == 1);
}
@Test
public void testGetTask() throws InterruptedException {
var taskSet = new TaskSet();
taskSet.addTask(new Task(100));
Task task = taskSet.getTask();
Assert.assertTrue(task.getTime() == 100);
Assert.assertTrue(taskSet.getSize() == 0);
}
}

View File

@ -0,0 +1,62 @@
/*
* The MIT License
* Copyright © 2014-2019 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.leaderfollowers;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests for WorkCenter
*/
public class WorkCenterTest {
@Test
public void testCreateWorkers() {
var taskSet = new TaskSet();
var taskHandler = new TaskHandler();
var workCenter = new WorkCenter();
workCenter.createWorkers(5, taskSet, taskHandler);
Assert.assertEquals(workCenter.getWorkers().size(), 5);
Assert.assertEquals(workCenter.getWorkers().get(0), workCenter.getLeader());
}
@Test
public void testNullLeader() {
var workCenter = new WorkCenter();
workCenter.promoteLeader();
Assert.assertNull(workCenter.getLeader());
}
@Test
public void testPromoteLeader() {
var taskSet = new TaskSet();
var taskHandler = new TaskHandler();
var workCenter = new WorkCenter();
workCenter.createWorkers(5, taskSet, taskHandler);
workCenter.removeWorker(workCenter.getLeader());
workCenter.promoteLeader();
Assert.assertEquals(workCenter.getWorkers().size(), 4);
Assert.assertEquals(workCenter.getWorkers().get(0), workCenter.getLeader());
}
}