diff --git a/eip-aggregator/README.md b/eip-aggregator/README.md
new file mode 100644
index 000000000..ec8454f63
--- /dev/null
+++ b/eip-aggregator/README.md
@@ -0,0 +1,33 @@
+---
+layout: pattern
+title: EIP Aggregator
+folder: eip-aggregator
+permalink: /patterns/eip-aggregator/
+categories: Enterprise integration
+tags:
+ - Java
+ - Difficulty-Intermittent
+ - Enterprise integration
+---
+
+## Intent
+Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example
+you may need to gather offers and after defined number of offers has been received you would like to choose the one with
+the best parameters.
+
+Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages,
+applies aggregation strategy and upon fulfilling given criteria, releasing merged messages.
+
+
+
+## Applicability
+Use the Aggregator pattern when
+
+* You need to combine multiple incoming messages
+* You want to process grouped data
+
+## Credits
+
+* [Gregor Hohpe, Bobby Woolf - Enterprise Integration Patterns](http://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html)
+* [Apache Camel - Documentation](http://camel.apache.org/aggregator2.html)
+
diff --git a/eip-aggregator/etc/aggregator.gif b/eip-aggregator/etc/aggregator.gif
new file mode 100644
index 000000000..b06cdfb0c
Binary files /dev/null and b/eip-aggregator/etc/aggregator.gif differ
diff --git a/eip-aggregator/pom.xml b/eip-aggregator/pom.xml
new file mode 100644
index 000000000..7467cb3a8
--- /dev/null
+++ b/eip-aggregator/pom.xml
@@ -0,0 +1,63 @@
+
+
+
+ * Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages, + * applies aggregation strategy and upon fulfilling given criteria, releasing merged messages. + *
+ * + */ +@SpringBootApplication +public class App { + + /** + * Program entry point. It starts Spring Boot application and using Apache Camel it auto-configures routes. + * + * @param args command line args + */ + public static void main(String[] args) throws Exception { + // Run Spring Boot application and obtain ApplicationContext + ConfigurableApplicationContext context = SpringApplication.run(App.class, args); + + // Get CamelContext from ApplicationContext + CamelContext camelContext = (CamelContext) context.getBean("camelContext"); + + // Add a new routes that will handle endpoints form SplitterRoute class. + camelContext.addRoutes(new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("{{endpoint}}").log("ENDPOINT: ${body}"); + } + + }); + + // Add producer that will send test message to an entry point in WireTapRoute + String[] stringArray = {"Test item #1", "Test item #2", "Test item #3"}; + camelContext.createProducerTemplate().sendBody("{{entry}}", stringArray); + + SpringApplication.exit(context); + } +} diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java new file mode 100644 index 000000000..78a0abd42 --- /dev/null +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java @@ -0,0 +1,37 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Sample aggregator route definition. + * + *+ * It consumes messages out of the direct:entry entry point and forwards them to direct:endpoint. + * Route accepts messages containing String as a body, it aggregates the messages based on the settings and forwards + * them as CSV to the output chanel. + * + * Settings for the aggregation are: aggregate until 3 messages are bundled or wait 2000ms before sending bundled + * messages further. + *
+ * + * In this example input/output endpoints names are stored in application.properties file. + */ +@Component +public class AggregatorRoute extends RouteBuilder { + + @Autowired + private MessageAggregationStrategy aggregator; + + /** + * Configures the route + * @throws Exception in case of exception during configuration + */ + @Override + public void configure() throws Exception { + // Main route + from("{{entry}}").aggregate(constant(true), aggregator).completionSize(3).completionInterval(2000).to("{{endpoint}}"); + } +} diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java new file mode 100644 index 000000000..a2c90c609 --- /dev/null +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java @@ -0,0 +1,27 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.springframework.stereotype.Component; + +/** + * Aggregation strategy joining bodies of messages. If message is first one oldMessage is null. All changes are + * made on IN messages. + */ +@Component +public class MessageAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String in1 = (String) oldExchange.getIn().getBody(); + String in2 = (String) newExchange.getIn().getBody(); + + oldExchange.getIn().setBody(in1 + ";" + in2); + + return oldExchange; + } +} diff --git a/eip-aggregator/src/main/resources/application.properties b/eip-aggregator/src/main/resources/application.properties new file mode 100644 index 000000000..cb879e6e2 --- /dev/null +++ b/eip-aggregator/src/main/resources/application.properties @@ -0,0 +1,2 @@ +entry=direct:entry +endpoint=direct:endpoint diff --git a/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java new file mode 100644 index 000000000..5eee0986f --- /dev/null +++ b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java @@ -0,0 +1,15 @@ +package com.iluwatar.eip.aggregator; + +import org.junit.Test; + +/** + * Test for App class + */ +public class AppTest { + + @Test + public void testMain() throws Exception { + String[] args = {}; + App.main(args); + } +} diff --git a/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java new file mode 100644 index 000000000..449c4aad0 --- /dev/null +++ b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java @@ -0,0 +1,62 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.EndpointInject; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.SpringApplicationConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.junit.Assert.assertEquals; + +/** + * Test class for AggregatorRoute. + *+ * In order for it to work we have to mock endpoints we want to read/write to. To mock those we need to substitute + * original endpoint names to mocks. + *
+ */ +@RunWith(SpringJUnit4ClassRunner.class) +@SpringApplicationConfiguration(classes = AggregatorRouteTest.class) +@ActiveProfiles("test") +@EnableAutoConfiguration +@ComponentScan +public class AggregatorRouteTest { + + @EndpointInject(uri = "{{entry}}") + private ProducerTemplate entry; + + @EndpointInject(uri = "{{endpoint}}") + private MockEndpoint endpoint; + + /** + * Test if endpoint receives three separate messages. + * @throws Exception in case of en exception during the test + */ + @Test + @DirtiesContext + public void testSplitter() throws Exception { + + // Three items in one entry message + entry.sendBody("TEST1"); + entry.sendBody("TEST2"); + entry.sendBody("TEST3"); + entry.sendBody("TEST4"); + entry.sendBody("TEST5"); + + // Endpoint should have three different messages in the end order of the messages is not important + endpoint.expectedMessageCount(2); + endpoint.assertIsSatisfied(); + + String body = (String) endpoint.getReceivedExchanges().get(0).getIn().getBody(); + assertEquals(3, body.split(";").length); + + String body2 = (String) endpoint.getReceivedExchanges().get(1).getIn().getBody(); + assertEquals(2, body2.split(";").length); + } +} diff --git a/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java new file mode 100644 index 000000000..f4685e334 --- /dev/null +++ b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java @@ -0,0 +1,42 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests MessageAggregationStrategy + */ +public class MessageAggregationStrategyTest { + + @Test + public void testAggregate() { + MessageAggregationStrategy mas = new MessageAggregationStrategy(); + Exchange oldExchange = new DefaultExchange((CamelContext) null); + oldExchange.getIn().setBody("TEST1"); + + Exchange newExchange = new DefaultExchange((CamelContext) null); + newExchange.getIn().setBody("TEST2"); + + Exchange output = mas.aggregate(oldExchange, newExchange); + String outputBody = (String) output.getIn().getBody(); + assertEquals("TEST1;TEST2", outputBody); + } + + @Test + public void testAggregateOldNull() { + MessageAggregationStrategy mas = new MessageAggregationStrategy(); + + Exchange newExchange = new DefaultExchange((CamelContext) null); + newExchange.getIn().setBody("TEST2"); + + Exchange output = mas.aggregate(null, newExchange); + String outputBody = (String) output.getIn().getBody(); + + assertEquals(newExchange, output); + assertEquals("TEST2", outputBody); + } +} diff --git a/eip-aggregator/src/test/resources/application-test.properties b/eip-aggregator/src/test/resources/application-test.properties new file mode 100644 index 000000000..f657ea5a1 --- /dev/null +++ b/eip-aggregator/src/test/resources/application-test.properties @@ -0,0 +1,2 @@ +entry=direct:entry +endpoint=mock:endpoint diff --git a/pom.xml b/pom.xml index 0c03909de..2d84d5064 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,7 @@