Java 11 migraiton: master-worker-pattern
This commit is contained in:
@@ -34,27 +34,25 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* <p>The <b><em>Master-Worker</em></b> pattern is used when the problem at hand can be solved by
|
||||
* dividing into
|
||||
* multiple parts which need to go through the same computation and may need to be aggregated to get
|
||||
* final result. Parallel processing is performed using a system consisting of a master and some
|
||||
* number of workers, where a master divides the work among the workers, gets the result back from
|
||||
* them and assimilates all the results to give final result. The only communication is between the
|
||||
* master and the worker - none of the workers communicate among one another and the user only
|
||||
* communicates with the master to get required job done.</p>
|
||||
* dividing into multiple parts which need to go through the same computation and may need to be
|
||||
* aggregated to get final result. Parallel processing is performed using a system consisting of a
|
||||
* master and some number of workers, where a master divides the work among the workers, gets the
|
||||
* result back from them and assimilates all the results to give final result. The only
|
||||
* communication is between the master and the worker - none of the workers communicate among one
|
||||
* another and the user only communicates with the master to get required job done.</p>
|
||||
* <p>In our example, we have generic abstract classes {@link MasterWorker}, {@link Master} and
|
||||
* {@link Worker} which
|
||||
* have to be extended by the classes which will perform the specific job at hand (in this case
|
||||
* finding transpose of matrix, done by {@link ArrayTransposeMasterWorker}, {@link
|
||||
* ArrayTransposeMaster} and {@link ArrayTransposeWorker}). The Master class divides the work into
|
||||
* parts to be given to the workers, collects the results from the workers and aggregates it when
|
||||
* all workers have responded before returning the solution. The Worker class extends the Thread
|
||||
* class to enable parallel processing, and does the work once the data has been received from the
|
||||
* Master. The MasterWorker contains a reference to the Master class, gets the input from the App
|
||||
* and passes it on to the Master. These 3 classes define the system which computes the result. We
|
||||
* also have 2 abstract classes {@link Input} and {@link Result}, which contain the input data and
|
||||
* result data respectively. The Input class also has an abstract method divideData which defines
|
||||
* how the data is to be divided into segments. These classes are extended by {@link ArrayInput} and
|
||||
* {@link ArrayResult}.</p>
|
||||
* {@link Worker} which have to be extended by the classes which will perform the specific job at
|
||||
* hand (in this case finding transpose of matrix, done by {@link ArrayTransposeMasterWorker},
|
||||
* {@link ArrayTransposeMaster} and {@link ArrayTransposeWorker}). The Master class divides the work
|
||||
* into parts to be given to the workers, collects the results from the workers and aggregates it
|
||||
* when all workers have responded before returning the solution. The Worker class extends the
|
||||
* Thread class to enable parallel processing, and does the work once the data has been received
|
||||
* from the Master. The MasterWorker contains a reference to the Master class, gets the input from
|
||||
* the App and passes it on to the Master. These 3 classes define the system which computes the
|
||||
* result. We also have 2 abstract classes {@link Input} and {@link Result}, which contain the input
|
||||
* data and result data respectively. The Input class also has an abstract method divideData which
|
||||
* defines how the data is to be divided into segments. These classes are extended by {@link
|
||||
* ArrayInput} and {@link ArrayResult}.</p>
|
||||
*/
|
||||
|
||||
public class App {
|
||||
@@ -68,12 +66,12 @@ public class App {
|
||||
*/
|
||||
|
||||
public static void main(String[] args) {
|
||||
ArrayTransposeMasterWorker mw = new ArrayTransposeMasterWorker();
|
||||
int rows = 10;
|
||||
int columns = 20;
|
||||
int[][] inputMatrix = ArrayUtilityMethods.createRandomIntMatrix(rows, columns);
|
||||
ArrayInput input = new ArrayInput(inputMatrix);
|
||||
ArrayResult result = (ArrayResult) mw.getResult(input);
|
||||
var mw = new ArrayTransposeMasterWorker();
|
||||
var rows = 10;
|
||||
var columns = 20;
|
||||
var inputMatrix = ArrayUtilityMethods.createRandomIntMatrix(rows, columns);
|
||||
var input = new ArrayInput(inputMatrix);
|
||||
var result = (ArrayResult) mw.getResult(input);
|
||||
if (result != null) {
|
||||
ArrayUtilityMethods.printMatrix(inputMatrix);
|
||||
ArrayUtilityMethods.printMatrix(result.data);
|
||||
|
@@ -25,6 +25,7 @@ package com.iluwatar.masterworker;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Class ArrayInput extends abstract class {@link Input} and contains data of type int[][].
|
||||
@@ -37,12 +38,12 @@ public class ArrayInput extends Input<int[][]> {
|
||||
}
|
||||
|
||||
static int[] makeDivisions(int[][] data, int num) {
|
||||
int initialDivision = data.length / num; //equally dividing
|
||||
int[] divisions = new int[num];
|
||||
var initialDivision = data.length / num; //equally dividing
|
||||
var divisions = new int[num];
|
||||
Arrays.fill(divisions, initialDivision);
|
||||
if (initialDivision * num != data.length) {
|
||||
int extra = data.length - initialDivision * num;
|
||||
int l = 0;
|
||||
var extra = data.length - initialDivision * num;
|
||||
var l = 0;
|
||||
//equally dividing extra among all parts
|
||||
while (extra > 0) {
|
||||
divisions[l] = divisions[l] + 1;
|
||||
@@ -58,22 +59,20 @@ public class ArrayInput extends Input<int[][]> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ArrayList<Input> divideData(int num) {
|
||||
public List<Input<int[][]>> divideData(int num) {
|
||||
if (this.data == null) {
|
||||
return null;
|
||||
} else {
|
||||
int[] divisions = makeDivisions(this.data, num);
|
||||
ArrayList<Input> result = new ArrayList<Input>(num);
|
||||
int rowsDone = 0; //number of rows divided so far
|
||||
for (int i = 0; i < num; i++) {
|
||||
int rows = divisions[i];
|
||||
var divisions = makeDivisions(this.data, num);
|
||||
var result = new ArrayList<Input<int[][]>>(num);
|
||||
var rowsDone = 0; //number of rows divided so far
|
||||
for (var i = 0; i < num; i++) {
|
||||
var rows = divisions[i];
|
||||
if (rows != 0) {
|
||||
int[][] divided = new int[rows][this.data[0].length];
|
||||
for (int j = 0; j < rows; j++) {
|
||||
divided[j] = this.data[rowsDone + j];
|
||||
}
|
||||
var divided = new int[rows][this.data[0].length];
|
||||
System.arraycopy(this.data, rowsDone, divided, 0, rows);
|
||||
rowsDone += rows;
|
||||
ArrayInput dividedInput = new ArrayInput(divided);
|
||||
var dividedInput = new ArrayInput(divided);
|
||||
result.add(dividedInput);
|
||||
} else {
|
||||
break; //rest of divisions will also be 0
|
||||
|
@@ -47,8 +47,8 @@ public class ArrayUtilityMethods {
|
||||
if (a1.length != a2.length) {
|
||||
return false;
|
||||
} else {
|
||||
boolean answer = false;
|
||||
for (int i = 0; i < a1.length; i++) {
|
||||
var answer = false;
|
||||
for (var i = 0; i < a1.length; i++) {
|
||||
if (a1[i] == a2[i]) {
|
||||
answer = true;
|
||||
} else {
|
||||
@@ -69,8 +69,8 @@ public class ArrayUtilityMethods {
|
||||
if (m1.length != m2.length) {
|
||||
return false;
|
||||
} else {
|
||||
boolean answer = false;
|
||||
for (int i = 0; i < m1.length; i++) {
|
||||
var answer = false;
|
||||
for (var i = 0; i < m1.length; i++) {
|
||||
if (arraysSame(m1[i], m2[i])) {
|
||||
answer = true;
|
||||
} else {
|
||||
@@ -88,9 +88,9 @@ public class ArrayUtilityMethods {
|
||||
* @return it (int[][]).
|
||||
*/
|
||||
public static int[][] createRandomIntMatrix(int rows, int columns) {
|
||||
int[][] matrix = new int[rows][columns];
|
||||
for (int i = 0; i < rows; i++) {
|
||||
for (int j = 0; j < columns; j++) {
|
||||
var matrix = new int[rows][columns];
|
||||
for (var i = 0; i < rows; i++) {
|
||||
for (var j = 0; j < columns; j++) {
|
||||
//filling cells in matrix
|
||||
matrix[i][j] = RANDOM.nextInt(10);
|
||||
}
|
||||
@@ -104,9 +104,9 @@ public class ArrayUtilityMethods {
|
||||
|
||||
public static void printMatrix(int[][] matrix) {
|
||||
//prints out int[][]
|
||||
for (int i = 0; i < matrix.length; i++) {
|
||||
for (int j = 0; j < matrix[0].length; j++) {
|
||||
LOGGER.info(matrix[i][j] + " ");
|
||||
for (var ints : matrix) {
|
||||
for (var j = 0; j < matrix[0].length; j++) {
|
||||
LOGGER.info(ints[j] + " ");
|
||||
}
|
||||
LOGGER.info("");
|
||||
}
|
||||
|
@@ -23,7 +23,7 @@
|
||||
|
||||
package com.iluwatar.masterworker;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The abstract Input class, having 1 public field which contains input data, and abstract method
|
||||
@@ -40,5 +40,5 @@ public abstract class Input<T> {
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
public abstract ArrayList<Input> divideData(int num);
|
||||
public abstract List<Input<T>> divideData(int num);
|
||||
}
|
||||
|
@@ -40,7 +40,7 @@ public abstract class MasterWorker {
|
||||
|
||||
abstract Master setMaster(int numOfWorkers);
|
||||
|
||||
public Result getResult(Input input) {
|
||||
public Result<?> getResult(Input<?> input) {
|
||||
this.master.doWork(input);
|
||||
return this.master.getFinalResult();
|
||||
}
|
||||
|
@@ -27,7 +27,8 @@ import com.iluwatar.masterworker.ArrayResult;
|
||||
import com.iluwatar.masterworker.system.systemworkers.ArrayTransposeWorker;
|
||||
import com.iluwatar.masterworker.system.systemworkers.Worker;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Enumeration;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* Class ArrayTransposeMaster extends abstract class {@link Master} and contains definition of
|
||||
@@ -41,35 +42,33 @@ public class ArrayTransposeMaster extends Master {
|
||||
|
||||
@Override
|
||||
ArrayList<Worker> setWorkers(int num) {
|
||||
ArrayList<Worker> ws = new ArrayList<Worker>(num);
|
||||
for (int i = 0; i < num; i++) {
|
||||
ws.add(new ArrayTransposeWorker(this, i + 1));
|
||||
//i+1 will be id
|
||||
}
|
||||
return ws;
|
||||
//i+1 will be id
|
||||
return IntStream.range(0, num)
|
||||
.mapToObj(i -> new ArrayTransposeWorker(this, i + 1))
|
||||
.collect(Collectors.toCollection(() -> new ArrayList<>(num)));
|
||||
}
|
||||
|
||||
@Override
|
||||
ArrayResult aggregateData() {
|
||||
// number of rows in final result is number of rows in any of obtained results from workers
|
||||
int rows = ((ArrayResult) this.getAllResultData()
|
||||
.get(this.getAllResultData().keys().nextElement())).data.length;
|
||||
int columns =
|
||||
0; //number of columns is sum of number of columns in all results obtained from workers
|
||||
for (Enumeration<Integer> e = this.getAllResultData().keys(); e.hasMoreElements(); ) {
|
||||
columns += ((ArrayResult) this.getAllResultData().get(e.nextElement())).data[0].length;
|
||||
var allResultData = this.getAllResultData();
|
||||
var rows = ((ArrayResult) allResultData.elements().nextElement()).data.length;
|
||||
var elements = allResultData.elements();
|
||||
var columns = 0; // columns = sum of number of columns in all results obtained from workers
|
||||
while (elements.hasMoreElements()) {
|
||||
columns += ((ArrayResult) elements.nextElement()).data[0].length;
|
||||
}
|
||||
int[][] resultData = new int[rows][columns];
|
||||
int columnsDone = 0; //columns aggregated so far
|
||||
for (int i = 0; i < this.getExpectedNumResults(); i++) {
|
||||
var resultData = new int[rows][columns];
|
||||
var columnsDone = 0; //columns aggregated so far
|
||||
var workers = this.getWorkers();
|
||||
for (var i = 0; i < this.getExpectedNumResults(); i++) {
|
||||
//result obtained from ith worker
|
||||
int[][] work =
|
||||
((ArrayResult) this.getAllResultData().get(this.getWorkers().get(i).getWorkerId())).data;
|
||||
for (int m = 0; m < work.length; m++) {
|
||||
var worker = workers.get(i);
|
||||
var workerId = worker.getWorkerId();
|
||||
var work = ((ArrayResult) allResultData.get(workerId)).data;
|
||||
for (var m = 0; m < work.length; m++) {
|
||||
//m = row number, n = columns number
|
||||
for (int n = 0; n < work[0].length; n++) {
|
||||
resultData[m][columnsDone + n] = work[m][n];
|
||||
}
|
||||
System.arraycopy(work[m], 0, resultData[m], columnsDone, work[0].length);
|
||||
}
|
||||
columnsDone += work[0].length;
|
||||
}
|
||||
|
@@ -26,8 +26,9 @@ package com.iluwatar.masterworker.system.systemmaster;
|
||||
import com.iluwatar.masterworker.Input;
|
||||
import com.iluwatar.masterworker.Result;
|
||||
import com.iluwatar.masterworker.system.systemworkers.Worker;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Dictionary;
|
||||
import java.util.Hashtable;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* The abstract Master class which contains private fields numOfWorkers (number of workers), workers
|
||||
@@ -38,24 +39,24 @@ import java.util.Hashtable;
|
||||
|
||||
public abstract class Master {
|
||||
private final int numOfWorkers;
|
||||
private final ArrayList<Worker> workers;
|
||||
private final List<Worker> workers;
|
||||
private final Dictionary<Integer, Result<?>> allResultData;
|
||||
private int expectedNumResults;
|
||||
private Hashtable<Integer, Result> allResultData;
|
||||
private Result finalResult;
|
||||
private Result<?> finalResult;
|
||||
|
||||
Master(int numOfWorkers) {
|
||||
this.numOfWorkers = numOfWorkers;
|
||||
this.workers = setWorkers(numOfWorkers);
|
||||
this.expectedNumResults = 0;
|
||||
this.allResultData = new Hashtable<Integer, Result>(numOfWorkers);
|
||||
this.allResultData = new Hashtable<>(numOfWorkers);
|
||||
this.finalResult = null;
|
||||
}
|
||||
|
||||
public Result getFinalResult() {
|
||||
public Result<?> getFinalResult() {
|
||||
return this.finalResult;
|
||||
}
|
||||
|
||||
Hashtable<Integer, Result> getAllResultData() {
|
||||
Dictionary<Integer, Result<?>> getAllResultData() {
|
||||
return this.allResultData;
|
||||
}
|
||||
|
||||
@@ -63,21 +64,21 @@ public abstract class Master {
|
||||
return this.expectedNumResults;
|
||||
}
|
||||
|
||||
ArrayList<Worker> getWorkers() {
|
||||
List<Worker> getWorkers() {
|
||||
return this.workers;
|
||||
}
|
||||
|
||||
abstract ArrayList<Worker> setWorkers(int num);
|
||||
abstract List<Worker> setWorkers(int num);
|
||||
|
||||
public void doWork(Input input) {
|
||||
public void doWork(Input<?> input) {
|
||||
divideWork(input);
|
||||
}
|
||||
|
||||
private void divideWork(Input input) {
|
||||
ArrayList<Input> dividedInput = input.divideData(numOfWorkers);
|
||||
private void divideWork(Input<?> input) {
|
||||
List<? extends Input<?>> dividedInput = input.divideData(numOfWorkers);
|
||||
if (dividedInput != null) {
|
||||
this.expectedNumResults = dividedInput.size();
|
||||
for (int i = 0; i < this.expectedNumResults; i++) {
|
||||
for (var i = 0; i < this.expectedNumResults; i++) {
|
||||
//ith division given to ith worker in this.workers
|
||||
this.workers.get(i).setReceivedData(this, dividedInput.get(i));
|
||||
this.workers.get(i).run();
|
||||
@@ -85,12 +86,12 @@ public abstract class Master {
|
||||
}
|
||||
}
|
||||
|
||||
public void receiveData(Result data, Worker w) {
|
||||
public void receiveData(Result<?> data, Worker w) {
|
||||
//check if can receive..if yes:
|
||||
collectResult(data, w.getWorkerId());
|
||||
}
|
||||
|
||||
private void collectResult(Result data, int workerId) {
|
||||
private void collectResult(Result<?> data, int workerId) {
|
||||
this.allResultData.put(workerId, data);
|
||||
if (this.allResultData.size() == this.expectedNumResults) {
|
||||
//all data received
|
||||
@@ -98,5 +99,5 @@ public abstract class Master {
|
||||
}
|
||||
}
|
||||
|
||||
abstract Result aggregateData();
|
||||
abstract Result<?> aggregateData();
|
||||
}
|
||||
|
@@ -41,12 +41,12 @@ public class ArrayTransposeWorker extends Worker {
|
||||
@Override
|
||||
ArrayResult executeOperation() {
|
||||
//number of rows in result matrix is equal to number of columns in input matrix and vice versa
|
||||
ArrayInput arrayInput = (ArrayInput) this.getReceivedData();
|
||||
final int rows = arrayInput.data[0].length;
|
||||
final int cols = arrayInput.data.length;
|
||||
int[][] resultData = new int[rows][cols];
|
||||
for (int i = 0; i < cols; i++) {
|
||||
for (int j = 0; j < rows; j++) {
|
||||
var arrayInput = (ArrayInput) this.getReceivedData();
|
||||
final var rows = arrayInput.data[0].length;
|
||||
final var cols = arrayInput.data.length;
|
||||
var resultData = new int[rows][cols];
|
||||
for (var i = 0; i < cols; i++) {
|
||||
for (var j = 0; j < rows; j++) {
|
||||
//flipping element positions along diagonal
|
||||
resultData[j][i] = arrayInput.data[i][j];
|
||||
}
|
||||
|
@@ -35,7 +35,7 @@ import com.iluwatar.masterworker.system.systemmaster.Master;
|
||||
public abstract class Worker extends Thread {
|
||||
private final Master master;
|
||||
private final int workerId;
|
||||
private Input receivedData;
|
||||
private Input<?> receivedData;
|
||||
|
||||
Worker(Master master, int id) {
|
||||
this.master = master;
|
||||
@@ -47,23 +47,23 @@ public abstract class Worker extends Thread {
|
||||
return this.workerId;
|
||||
}
|
||||
|
||||
Input getReceivedData() {
|
||||
Input<?> getReceivedData() {
|
||||
return this.receivedData;
|
||||
}
|
||||
|
||||
public void setReceivedData(Master m, Input i) {
|
||||
public void setReceivedData(Master m, Input<?> i) {
|
||||
//check if ready to receive..if yes:
|
||||
this.receivedData = i;
|
||||
}
|
||||
|
||||
abstract Result executeOperation();
|
||||
abstract Result<?> executeOperation();
|
||||
|
||||
private void sendToMaster(Result data) {
|
||||
private void sendToMaster(Result<?> data) {
|
||||
this.master.receiveData(data, this);
|
||||
}
|
||||
|
||||
public void run() { //from Thread class
|
||||
Result work = executeOperation();
|
||||
var work = executeOperation();
|
||||
sendToMaster(work);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user