From 1387e2bc96b7f2c4b0e175c2b074aea2f3bd0d29 Mon Sep 17 00:00:00 2001 From: Chandana Amarnath Date: Mon, 28 Nov 2016 20:13:18 +0530 Subject: [PATCH] Fixed all the code changes after review --- queue-load-leveling/README.md | 2 +- .../java/org/queue/load/leveling/App.java | 51 ++++++++++++++----- .../java/org/queue/load/leveling/Message.java | 13 +---- .../org/queue/load/leveling/MessageQueue.java | 2 +- .../queue/load/leveling/ServiceExecutor.java | 6 +-- .../queue/load/leveling/TaskGenerator.java | 16 +++--- .../java/org/queue/load/leveling/AppTest.java | 2 +- .../queue/load/leveling/MessageQueueTest.java | 8 +-- .../org/queue/load/leveling/MessageTest.java | 5 -- 9 files changed, 58 insertions(+), 47 deletions(-) diff --git a/queue-load-leveling/README.md b/queue-load-leveling/README.md index ad65dad91..1179e5985 100644 --- a/queue-load-leveling/README.md +++ b/queue-load-leveling/README.md @@ -31,4 +31,4 @@ for both the task and the service. ## Credits -* [Design Pattern: Queue-Based Load Leveling Pattern](https://msdn.microsoft.com/en-us/library/dn589783.aspx) \ No newline at end of file +* [Microsoft Cloud Design Patterns: Queue-Based Load Leveling Pattern](https://msdn.microsoft.com/en-us/library/dn589783.aspx) \ No newline at end of file diff --git a/queue-load-leveling/src/main/java/org/queue/load/leveling/App.java b/queue-load-leveling/src/main/java/org/queue/load/leveling/App.java index 0a3f949bd..19f8939a4 100644 --- a/queue-load-leveling/src/main/java/org/queue/load/leveling/App.java +++ b/queue-load-leveling/src/main/java/org/queue/load/leveling/App.java @@ -23,6 +23,10 @@ package org.queue.load.leveling; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,37 +62,56 @@ public class App { private static final Logger LOGGER = LoggerFactory.getLogger(App.class); + //Executor shut down time limit. + private static final int SHUTDOWN_TIME = 15; + /** * Program entry point * * @param args command line args */ public static void main(String[] args) { + + // An Executor that provides methods to manage termination and methods that can + // produce a Future for tracking progress of one or more asynchronous tasks. + ExecutorService executor = null; + try { // Create a MessageQueue object. MessageQueue msgQueue = new MessageQueue(); - LOGGER.info("All the TaskGenerators started."); + LOGGER.info("Submitting TaskGenerators and ServiceExecutor threads."); // Create three TaskGenerator threads. Each of them will submit different number of jobs. Runnable taskRunnable1 = new TaskGenerator(msgQueue, 5); Runnable taskRunnable2 = new TaskGenerator(msgQueue, 1); Runnable taskRunnable3 = new TaskGenerator(msgQueue, 2); - Thread taskGenerator1 = new Thread(taskRunnable1, "Task_Generator_1"); - Thread taskGenerator2 = new Thread(taskRunnable2, "Task_Generator_2"); - Thread taskGenerator3 = new Thread(taskRunnable3, "Task_Generator_3"); - - taskGenerator1.start(); - taskGenerator2.start(); - taskGenerator3.start(); - - LOGGER.info("Service Executor started."); - - // First create e service which will process all the jobs. + // Create e service which should process the submitted jobs. Runnable srvRunnable = new ServiceExecutor(msgQueue); - Thread srvExec = new Thread(srvRunnable, "Service_Executor_Thread"); - srvExec.start(); + + // Create a ThreadPool of 2 threads and + // submit all Runnable task for execution to executor.. + executor = Executors.newFixedThreadPool(2); + executor.submit(taskRunnable1); + executor.submit(taskRunnable2); + executor.submit(taskRunnable3); + + // submitting serviceExecutor thread to the Executor service. + executor.submit(srvRunnable); + + // Initiates an orderly shutdown. + LOGGER.info("Intiating shutdown. Executor will shutdown only after all the Threads are completed."); + executor.shutdown(); + + // Wait for SHUTDOWN_TIME seconds for all the threads to complete + // their tasks and then shut down the executor and then exit. + if ( !executor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS) ) { + LOGGER.info("Executor was shut down and Exiting."); + executor.shutdownNow(); + } + } catch (InterruptedException ie) { + LOGGER.error(ie.getMessage()); } catch (Exception e) { LOGGER.error(e.getMessage()); } diff --git a/queue-load-leveling/src/main/java/org/queue/load/leveling/Message.java b/queue-load-leveling/src/main/java/org/queue/load/leveling/Message.java index cffbc7af3..1f4aa8249 100644 --- a/queue-load-leveling/src/main/java/org/queue/load/leveling/Message.java +++ b/queue-load-leveling/src/main/java/org/queue/load/leveling/Message.java @@ -27,12 +27,8 @@ package org.queue.load.leveling; * */ public class Message { - private String msg; - - // Empty constructor. - public Message() { - } - + private final String msg; + // Parameter constructor. public Message(String msg) { super(); @@ -44,11 +40,6 @@ public class Message { return msg; } - // Set Method for attribute msg. - public void setMsg(String msg) { - this.msg = msg; - } - @Override public String toString() { return msg; diff --git a/queue-load-leveling/src/main/java/org/queue/load/leveling/MessageQueue.java b/queue-load-leveling/src/main/java/org/queue/load/leveling/MessageQueue.java index 7cadcd2e6..797226e0a 100644 --- a/queue-load-leveling/src/main/java/org/queue/load/leveling/MessageQueue.java +++ b/queue-load-leveling/src/main/java/org/queue/load/leveling/MessageQueue.java @@ -38,7 +38,7 @@ public class MessageQueue { private static final Logger LOGGER = LoggerFactory.getLogger(App.class); - private BlockingQueue blkQueue; + private final BlockingQueue blkQueue; // Default constructor when called creates Blocking Queue object. public MessageQueue() { diff --git a/queue-load-leveling/src/main/java/org/queue/load/leveling/ServiceExecutor.java b/queue-load-leveling/src/main/java/org/queue/load/leveling/ServiceExecutor.java index 107188cf0..02eb43b89 100644 --- a/queue-load-leveling/src/main/java/org/queue/load/leveling/ServiceExecutor.java +++ b/queue-load-leveling/src/main/java/org/queue/load/leveling/ServiceExecutor.java @@ -36,7 +36,7 @@ public class ServiceExecutor implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(App.class); - private MessageQueue msgQueue; + private final MessageQueue msgQueue; public ServiceExecutor(MessageQueue msgQueue) { this.msgQueue = msgQueue; @@ -53,12 +53,12 @@ public class ServiceExecutor implements Runnable { if (null != msg) { LOGGER.info(msg.toString() + " is served."); } else { - LOGGER.info("ServiceExecutor: All tasks are executed. Waiting."); + LOGGER.info("Service Executor: Waiting for Messages to serve .. "); } Thread.sleep(1000); } - } catch (InterruptedException ie) { + } catch (InterruptedException ie) { LOGGER.error(ie.getMessage()); } catch (Exception e) { LOGGER.error(e.getMessage()); diff --git a/queue-load-leveling/src/main/java/org/queue/load/leveling/TaskGenerator.java b/queue-load-leveling/src/main/java/org/queue/load/leveling/TaskGenerator.java index 469881f24..211354e53 100644 --- a/queue-load-leveling/src/main/java/org/queue/load/leveling/TaskGenerator.java +++ b/queue-load-leveling/src/main/java/org/queue/load/leveling/TaskGenerator.java @@ -37,10 +37,10 @@ public class TaskGenerator implements Task, Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(App.class); // MessageQueue reference using which we will submit our messages. - private MessageQueue msgQueue; + private final MessageQueue msgQueue; // Total message count that a TaskGenerator will submit. - private int msgCount; + private final int msgCount; // Parameterized constructor. public TaskGenerator(MessageQueue msgQueue, int msgCount) { @@ -64,16 +64,18 @@ public class TaskGenerator implements Task, Runnable { * After every message submission TaskGenerator thread will sleep for 1 second. */ public void run() { + + int count = this.msgCount; + try { - while (this.msgCount > 0) { - String statusMsg = "Message-" + this.msgCount + " submitted by " + Thread.currentThread().getName(); - Message newMessage = new Message(statusMsg); - this.submit(newMessage); + while (count > 0) { + String statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName(); + this.submit(new Message(statusMsg)); LOGGER.info(statusMsg); // reduce the message count. - this.msgCount--; + count--; // Make the current thread to sleep after every Message submission. Thread.sleep(1000); diff --git a/queue-load-leveling/src/test/java/org/queue/load/leveling/AppTest.java b/queue-load-leveling/src/test/java/org/queue/load/leveling/AppTest.java index fc4e5dbe1..dbf0c1269 100644 --- a/queue-load-leveling/src/test/java/org/queue/load/leveling/AppTest.java +++ b/queue-load-leveling/src/test/java/org/queue/load/leveling/AppTest.java @@ -27,7 +27,7 @@ import java.io.IOException; import org.junit.Test; /** - * Tests that Caching example runs without errors. + * Application Test */ public class AppTest { @Test diff --git a/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageQueueTest.java b/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageQueueTest.java index 2c1c8e8bc..2b2110a56 100644 --- a/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageQueueTest.java +++ b/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageQueueTest.java @@ -22,9 +22,10 @@ */ package org.queue.load.leveling; -import org.junit.Test; import static org.junit.Assert.assertEquals; +import org.junit.Test; + /** * * Test case for submitting and retrieving messages from Blocking Queue. @@ -36,13 +37,12 @@ public class MessageQueueTest { public void messageQueueTest() { MessageQueue msgQueue = new MessageQueue(); - Message msg = new Message("MessageQueue Test"); // submit message - msgQueue.submitMsg(msg); + msgQueue.submitMsg(new Message("MessageQueue Test")); // retrieve message - assertEquals(msg.getMsg(), msgQueue.retrieveMsg().getMsg()); + assertEquals(msgQueue.retrieveMsg().getMsg(), "MessageQueue Test"); } } diff --git a/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageTest.java b/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageTest.java index 1d8613a68..72a0b7406 100644 --- a/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageTest.java +++ b/queue-load-leveling/src/test/java/org/queue/load/leveling/MessageTest.java @@ -39,10 +39,5 @@ public class MessageTest { String testMsg = "Message Test"; Message msg = new Message(testMsg); assertEquals(msg.getMsg(), testMsg); - - // Default constructor and setter method test. - Message simpleMsg = new Message(); - simpleMsg.setMsg(testMsg); - assertEquals(simpleMsg.getMsg(), testMsg); } }