Added priority queue design pattern (#888)

* added priority queue design pattern

* Minor Refactored, fixed review comments
This commit is contained in:
Ranjeet
2019-08-31 23:40:35 +05:30
committed by Ilkka Seppälä
parent 11c0550559
commit 7f6067f19f
10 changed files with 616 additions and 7 deletions

View File

@@ -0,0 +1,60 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
* <p>
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
* <p>
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* <p>
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
/**
* Prioritize requests sent to services so that requests with a higher priority are received and
* processed more quickly than those of a lower priority.
* This pattern is useful in applications that offer different service level guarantees
* to individual clients.
* Example :Send multiple message with different priority to worker queue.
* Worker execute higher priority message first
* @see "https://docs.microsoft.com/en-us/previous-versions/msp-n-p/dn589794(v=pandp.10)"
*/
public class Application {
/**
* main entry
*/
public static void main(String[] args) throws Exception {
QueueManager queueManager = new QueueManager(100);
// push some message to queue
// Low Priority message
for (int i = 0; i < 100; i++) {
queueManager.publishMessage(new Message("Low Message Priority", 0));
}
// High Priority message
for (int i = 0; i < 100; i++) {
queueManager.publishMessage(new Message("High Message Priority", 1));
}
// run worker
Worker worker = new Worker(queueManager);
worker.run();
}
}

View File

@@ -0,0 +1,50 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
/**
* Message bean
*/
public class Message implements Comparable<Message> {
private final String message;
private final int priority; // define message priority in queue
public Message(String message, int priority) {
this.message = message;
this.priority = priority;
}
@Override
public int compareTo(Message o) {
return priority - o.priority;
}
@Override
public String toString() {
return "Message{"
+ "message='" + message + '\''
+ ", priority=" + priority
+ '}';
}
}

View File

@@ -0,0 +1,178 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
* <p>
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
* <p>
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* <p>
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Arrays.copyOf;
/**
* Keep high Priority message on top using maxHeap.
*
* @param <T> : DataType to push in Queue
*/
public class PriorityMessageQueue<T extends Comparable> {
private static final Logger LOGGER = LoggerFactory.getLogger(PriorityMessageQueue.class);
private int size = 0;
private int capacity;
private T[] queue;
public PriorityMessageQueue(T[] queue) {
this.queue = queue;
this.capacity = queue.length;
}
/**
* Remove top message from queue
*/
public T remove() {
if (isEmpty()) {
return null;
}
T root = queue[0];
queue[0] = queue[size - 1];
size--;
maxHeapifyDown();
return root;
}
/**
* Add message to queue
*/
public void add(T t) {
ensureCapacity();
queue[size] = t;
size++;
maxHeapifyUp();
}
/**
* Check queue size
*/
public boolean isEmpty() {
return size == 0;
}
private void maxHeapifyDown() {
int index = 0;
while (hasLeftChild(index)) {
int smallerIndex = leftChildIndex(index);
if (hasRightChild(index) && right(index).compareTo(left(index)) > 0) {
smallerIndex = rightChildIndex(index);
}
if (queue[index].compareTo(queue[smallerIndex]) > 0) {
break;
} else {
swap(index, smallerIndex);
}
index = smallerIndex;
}
}
private void maxHeapifyUp() {
int index = size - 1;
while (hasParent(index) && parent(index).compareTo(queue[index]) < 0) {
swap(parentIndex(index), index);
index = parentIndex(index);
}
}
// index
private int parentIndex(int pos) {
return (pos - 1) / 2;
}
private int leftChildIndex(int parentPos) {
return 2 * parentPos + 1;
}
private int rightChildIndex(int parentPos) {
return 2 * parentPos + 2;
}
// value
private T parent(int childIndex) {
return queue[parentIndex(childIndex)];
}
private T left(int parentIndex) {
return queue[leftChildIndex(parentIndex)];
}
private T right(int parentIndex) {
return queue[rightChildIndex(parentIndex)];
}
// check
private boolean hasLeftChild(int index) {
return leftChildIndex(index) < size;
}
private boolean hasRightChild(int index) {
return rightChildIndex(index) < size;
}
private boolean hasParent(int index) {
return parentIndex(index) >= 0;
}
private void swap(int fpos, int tpos) {
T tmp = queue[fpos];
queue[fpos] = queue[tpos];
queue[tpos] = tmp;
}
private void ensureCapacity() {
if (size == capacity) {
capacity = capacity * 2;
queue = copyOf(queue, capacity);
}
}
/**
* For debug .. print current state of queue
*/
public void print() {
for (int i = 0; i <= size / 2; i++) {
LOGGER.info(" PARENT : " + queue[i] + " LEFT CHILD : "
+ left(i) + " RIGHT CHILD :" + right(i));
}
}
}

View File

@@ -0,0 +1,57 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
* <p>
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
* <p>
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* <p>
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
/**
* Manage priority queue
*/
public class QueueManager {
/*
Priority message
*/
private final PriorityMessageQueue<Message> messagePriorityMessageQueue;
public QueueManager(int initialCapacity) {
messagePriorityMessageQueue = new PriorityMessageQueue<Message>(new Message[initialCapacity]);
}
/**
* Publish message to queue
*/
public void publishMessage(Message message) {
messagePriorityMessageQueue.add(message);
}
/**
* recive message from queue
*/
public Message receiveMessage() {
if (messagePriorityMessageQueue.isEmpty()) {
return null;
}
return messagePriorityMessageQueue.remove();
}
}

View File

@@ -0,0 +1,63 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
* <p>
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
* <p>
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
* <p>
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Message Worker
*/
public class Worker {
private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);
private final QueueManager queueManager;
public Worker(QueueManager queueManager) {
this.queueManager = queueManager;
}
/**
* Keep checking queue for message
*/
public void run() throws Exception {
while (true) {
Message message = queueManager.receiveMessage();
if (message == null) {
LOGGER.info("No Message ... waiting");
Thread.sleep(200);
} else {
processMessage(message);
}
}
}
/**
* Process message
*/
private void processMessage(Message message) {
LOGGER.info(message.toString());
}
}

View File

@@ -0,0 +1,73 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test case for order of messages
*/
public class PriorityMessageQueueTest {
@Test
public void remove() {
PriorityMessageQueue<String> stringPriorityMessageQueue = new PriorityMessageQueue<>(new String[2]);
String pushMessage = "test";
stringPriorityMessageQueue.add(pushMessage);
assertEquals(stringPriorityMessageQueue.remove(), pushMessage);
}
@Test
public void add() {
PriorityMessageQueue<Integer> stringPriorityMessageQueue = new PriorityMessageQueue<>(new Integer[2]);
stringPriorityMessageQueue.add(1);
stringPriorityMessageQueue.add(5);
stringPriorityMessageQueue.add(10);
stringPriorityMessageQueue.add(3);
assertTrue(stringPriorityMessageQueue.remove() == 10);
}
@Test
public void isEmpty() {
PriorityMessageQueue<Integer> stringPriorityMessageQueue = new PriorityMessageQueue<>(new Integer[2]);
assertTrue(stringPriorityMessageQueue.isEmpty());
stringPriorityMessageQueue.add(1);
stringPriorityMessageQueue.remove();
assertTrue(stringPriorityMessageQueue.isEmpty());
}
@Test
public void testEnsureSize() {
PriorityMessageQueue<Integer> stringPriorityMessageQueue = new PriorityMessageQueue<>(new Integer[2]);
assertTrue(stringPriorityMessageQueue.isEmpty());
stringPriorityMessageQueue.add(1);
stringPriorityMessageQueue.add(2);
stringPriorityMessageQueue.add(2);
stringPriorityMessageQueue.add(3);
assertTrue(stringPriorityMessageQueue.remove() == 3);
}
}

View File

@@ -0,0 +1,53 @@
/**
* The MIT License
* Copyright (c) 2014 Ilkka Seppälä
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.iluwatar.priority.queue;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Check queue manager
*/
public class QueueManagerTest {
@Test
public void publishMessage() {
QueueManager queueManager = new QueueManager(2);
Message testMessage = new Message("Test Message", 1);
queueManager.publishMessage(testMessage);
Message recivedMessage = queueManager.receiveMessage();
assertEquals(testMessage, recivedMessage);
}
@Test
public void receiveMessage() {
QueueManager queueManager = new QueueManager(2);
Message testMessage1 = new Message("Test Message 1", 1);
queueManager.publishMessage(testMessage1);
Message testMessage2 = new Message("Test Message 2", 2);
queueManager.publishMessage(testMessage2);
Message recivedMessage = queueManager.receiveMessage();
assertEquals(testMessage2, recivedMessage);
}
}