From 6e0bf59e5a7e2a768f0ddc923178e8153ba3c0ce Mon Sep 17 00:00:00 2001 From: "adam.kaczmmarek@gmail.com" Date: Sat, 4 Nov 2017 22:38:51 +0100 Subject: [PATCH 1/3] #114 Aggregator pattern; tests; description --- eip-aggregator/README.md | 33 +++++++++ eip-aggregator/etc/aggregator.gif | Bin 0 -> 2413 bytes eip-aggregator/pom.xml | 63 ++++++++++++++++++ .../java/com/iluwatar/eip/aggregator/App.java | 51 ++++++++++++++ .../aggregator/routes/AggregatorRoute.java | 37 ++++++++++ .../routes/MessageAggregationStrategy.java | 27 ++++++++ .../src/main/resources/application.properties | 2 + .../com/iluwatar/eip/aggregator/AppTest.java | 15 +++++ .../routes/AggregatorRouteTest.java | 62 +++++++++++++++++ .../MessageAggregationStrategyTest.java | 42 ++++++++++++ .../resources/application-test.properties | 2 + pom.xml | 1 + 12 files changed, 335 insertions(+) create mode 100644 eip-aggregator/README.md create mode 100644 eip-aggregator/etc/aggregator.gif create mode 100644 eip-aggregator/pom.xml create mode 100644 eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/App.java create mode 100644 eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java create mode 100644 eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java create mode 100644 eip-aggregator/src/main/resources/application.properties create mode 100644 eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java create mode 100644 eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java create mode 100644 eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java create mode 100644 eip-aggregator/src/test/resources/application-test.properties diff --git a/eip-aggregator/README.md b/eip-aggregator/README.md new file mode 100644 index 000000000..ec8454f63 --- /dev/null +++ b/eip-aggregator/README.md @@ -0,0 +1,33 @@ +--- +layout: pattern +title: EIP Aggregator +folder: eip-aggregator +permalink: /patterns/eip-aggregator/ +categories: Enterprise integration +tags: + - Java + - Difficulty-Intermittent + - Enterprise integration +--- + +## Intent +Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example +you may need to gather offers and after defined number of offers has been received you would like to choose the one with +the best parameters. + +Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages, +applies aggregation strategy and upon fulfilling given criteria, releasing merged messages. + +![alt text](./etc/aggregator.gif "Splitter") + +## Applicability +Use the Aggregator pattern when + +* You need to combine multiple incoming messages +* You want to process grouped data + +## Credits + +* [Gregor Hohpe, Bobby Woolf - Enterprise Integration Patterns](http://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html) +* [Apache Camel - Documentation](http://camel.apache.org/aggregator2.html) + diff --git a/eip-aggregator/etc/aggregator.gif b/eip-aggregator/etc/aggregator.gif new file mode 100644 index 0000000000000000000000000000000000000000..b06cdfb0c920bd2e8f9257ebcae452e81dac6acd GIT binary patch literal 2413 zcmV-z36l0lNk%w1VZs510J8u9|NsC0|7OhpnKR5YfPjGiGXOI)%m4rY|1+5Y00000 z00000000000000000000A^8La6aWYSEC2ui0Kx%=000F45Xniay*TU5yZ>M)j$~<` zXsWJk>%MR-&vb3yIC|`R?*G7`a7ZlLc7$ZIXKXs3(5Q5}IVi77s&>omdcRey_y8`O z&*=2aDjuuf@VGq9ZePjsynfIB)$MnEf`f#GB7cBchKr1if{2NDj+2y?V32^4mYbZN zM3{D(o};9tFrZkXrmL*2AgF<^va_@mumq~Ly1S#OxxB!^lSp^J!pF#jJWR>W&dSfx z(tO6#*4JIt*xTGr+TGycKHuWy=BMZB>NDhT%k4q$TJr0Tx$pS+^N7se{O~x4Dd|@a znuP-c5)KH!P+>xb0RWE6^u$+1iUkYZQwY)F!iEz6117@NOG?BTSAiV@kOxvjx#2M|o1aaNuXl01k`FxVh7h&Ve?sR#h73UeKo= zrJ~e|Gagy7I?lq$$F{B8w{YXarNb2J%dZR3u2Cz5YFQwD^?s5|xUk{Ff&sJj==5V^ z7J;*BeLNzvy8{9;Yu?Pcv**vCLyI0w@DkIzc7aBWV%h9u$yX^W9!e8@P2NDbbIV z?nis~_wWUL|Kb&|Q})v_mej94)SMmh1<2k1d|uSY7+&~I_YZ3eLLtx+36A5QfE3aUBaS(Omf{vde&V8vFcN9xkx9z9qjNnz$z&e@CRe19NLJa|lJjKwB9tRa zIc1n)Zbl{*P1>YonNfN<<&ABUb|rWk%?T%$Y=-uwje+L*=7(n5!r~Dt5_ZEqi!w;0 ziFpdTCw+~|b@JwhC+ivj+U?L6^0Pc`IwV9y@BX1xQ=jX~OzOEIUl` z%FwRKqI%G?uD1K^uSsw~ZMC^kYNZl5;CsS7=4wmoxyqhN0YcCPrLHai`rB{7GcoI~ zv$6hgg$DDMn2(Ybk9jApZeB_Q7AEk!FtPx9dq}p!Ds=F`+XCFOt}9Q_6vWWF3NOej zstG2>JSe#_Zr85R1_>gM{BpOOHnm}6F-vd&$qGZ%G^gxVP4mrS9K^B4Yx4XK*E^S} zuML2ld=S*=GGuZHUZu>@)K8;LGTc_z{O$#0-0*w za<{zptogXz5~yrN?J}v9=TrCp!_nfM_~&*w-S^;KGgnwLH8DSHg3_+ zebXb4{N~rI`q^!JK7gIpCUvoTWsE{WQ-K7{hA;+}%zf~SicOA&HV1OfX53R=&f0Xr z3S}?_6AKyRJP0!oE)78=B;m_UsJ+fb@O%{e9=%W$KpChISTm$y?Lv0Lr7cj0|Kg$j z?iQjUR_TQ$Numdr2&_o|DaKKK@nS`UD4+3Jv5fgkni#p_j24n6d}l=C5e3FZp19FF za6BUx(Njk@-ce9_wBjC-(nqfR(P!n0;~NIQ}-b$E&+>Wa9fMylkHxe4SD7bz!8 zW(1Ss(<%uPfj5p+E;}(1tox z4l{e8Iv1)7hg!7%pb$-HoYpBrK2fwM8qI-hq%~4<8Rets328uFKvI^P6f!79ib|Ko zDNae^rryIT6L^r*fz=eJ`*NC1Wkyud=@btQ9jh)qXIwrb3PB zk7mlKp<)%NU`1+Hu}apR7PX^hE$djRn$@U!Z>`M}U0aJ9*15t}u4>h4kk)8Yx4Jd2 zetgl#h{l#xM1YO#SSO3bye^g=h_x(H0U}no z-W0Q;^{gfwOWDe**0Y)o>}mzuS-(p5v7&t{0d2ck+`e|StsO*Z1v^{2epR!y70hsp z>)Yibx3}{DDlBm!Yuvy>H@JCl?ro)7SEX(@NYWJrcjo=`b2aV1{X_CTyuZSiGe+*94&m#MOas#m#d z)z22ortEzxTQSg59v>C28AvgZYdlpX6B$>BE$vRV`&I4!^}&@)Yifr~<*3EQRVLfONpS0AiK8UCZt>zuS+04N9ZK^lRTr2-t)(sAGt!@2dW&hbnoQ}15 z`KarmdRfH`w)R_xykt2;S+Om?a*%rqByNxQ&)yF7xTOuETUS-KTElWTJsjP17?{8f zv$IpvEscBQyI5A0cf9TG?|&Qo;0RB+!WYi)0Am)t4UhOr>5XiOTO6i@PV&V!4(3r; zdB#iSxX9z0aV>Yb;3Iz}oc)aJl)F6R9*^?5UruwFpZd@huX)OgZEAGu{N)cPu90&Z f{_~<6{pd(fy3&`<^rk!g=}?cl)TeHa2mk;(KgY=c literal 0 HcmV?d00001 diff --git a/eip-aggregator/pom.xml b/eip-aggregator/pom.xml new file mode 100644 index 000000000..7467cb3a8 --- /dev/null +++ b/eip-aggregator/pom.xml @@ -0,0 +1,63 @@ + + + + 4.0.0 + eip-aggregator + + com.iluwatar + java-design-patterns + 1.18.0-SNAPSHOT + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.apache.camel + camel-core + ${camel.version} + + + + org.apache.camel + camel-spring-boot + ${camel.version} + + + + + org.springframework.boot + spring-boot-starter-test + + + + org.apache.camel + camel-test-spring + ${camel.version} + + + + \ No newline at end of file diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/App.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/App.java new file mode 100644 index 000000000..fbff1a28a --- /dev/null +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/App.java @@ -0,0 +1,51 @@ +package com.iluwatar.eip.aggregator; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; + +/** + * Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example + * you may need to gather offers and after defined number of offers has been received you would like to choose the one + * with the best parameters. + * + *

+ * Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages, + * applies aggregation strategy and upon fulfilling given criteria, releasing merged messages. + *

+ * + */ +@SpringBootApplication +public class App { + + /** + * Program entry point. It starts Spring Boot application and using Apache Camel it auto-configures routes. + * + * @param args command line args + */ + public static void main(String[] args) throws Exception { + // Run Spring Boot application and obtain ApplicationContext + ConfigurableApplicationContext context = SpringApplication.run(App.class, args); + + // Get CamelContext from ApplicationContext + CamelContext camelContext = (CamelContext) context.getBean("camelContext"); + + // Add a new routes that will handle endpoints form SplitterRoute class. + camelContext.addRoutes(new RouteBuilder() { + + @Override + public void configure() throws Exception { + from("{{endpoint}}").log("ENDPOINT: ${body}"); + } + + }); + + // Add producer that will send test message to an entry point in WireTapRoute + String[] stringArray = {"Test item #1", "Test item #2", "Test item #3"}; + camelContext.createProducerTemplate().sendBody("{{entry}}", stringArray); + + SpringApplication.exit(context); + } +} diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java new file mode 100644 index 000000000..78a0abd42 --- /dev/null +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java @@ -0,0 +1,37 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Sample aggregator route definition. + * + *

+ * It consumes messages out of the direct:entry entry point and forwards them to direct:endpoint. + * Route accepts messages containing String as a body, it aggregates the messages based on the settings and forwards + * them as CSV to the output chanel. + * + * Settings for the aggregation are: aggregate until 3 messages are bundled or wait 2000ms before sending bundled + * messages further. + *

+ * + * In this example input/output endpoints names are stored in application.properties file. + */ +@Component +public class AggregatorRoute extends RouteBuilder { + + @Autowired + private MessageAggregationStrategy aggregator; + + /** + * Configures the route + * @throws Exception in case of exception during configuration + */ + @Override + public void configure() throws Exception { + // Main route + from("{{entry}}").aggregate(constant(true), aggregator).completionSize(3).completionInterval(2000).to("{{endpoint}}"); + } +} diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java new file mode 100644 index 000000000..a2c90c609 --- /dev/null +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategy.java @@ -0,0 +1,27 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.springframework.stereotype.Component; + +/** + * Aggregation strategy joining bodies of messages. If message is first one oldMessage is null. All changes are + * made on IN messages. + */ +@Component +public class MessageAggregationStrategy implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String in1 = (String) oldExchange.getIn().getBody(); + String in2 = (String) newExchange.getIn().getBody(); + + oldExchange.getIn().setBody(in1 + ";" + in2); + + return oldExchange; + } +} diff --git a/eip-aggregator/src/main/resources/application.properties b/eip-aggregator/src/main/resources/application.properties new file mode 100644 index 000000000..cb879e6e2 --- /dev/null +++ b/eip-aggregator/src/main/resources/application.properties @@ -0,0 +1,2 @@ +entry=direct:entry +endpoint=direct:endpoint diff --git a/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java new file mode 100644 index 000000000..5eee0986f --- /dev/null +++ b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/AppTest.java @@ -0,0 +1,15 @@ +package com.iluwatar.eip.aggregator; + +import org.junit.Test; + +/** + * Test for App class + */ +public class AppTest { + + @Test + public void testMain() throws Exception { + String[] args = {}; + App.main(args); + } +} diff --git a/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java new file mode 100644 index 000000000..449c4aad0 --- /dev/null +++ b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/AggregatorRouteTest.java @@ -0,0 +1,62 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.EndpointInject; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.SpringApplicationConfiguration; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.junit.Assert.assertEquals; + +/** + * Test class for AggregatorRoute. + *

+ * In order for it to work we have to mock endpoints we want to read/write to. To mock those we need to substitute + * original endpoint names to mocks. + *

+ */ +@RunWith(SpringJUnit4ClassRunner.class) +@SpringApplicationConfiguration(classes = AggregatorRouteTest.class) +@ActiveProfiles("test") +@EnableAutoConfiguration +@ComponentScan +public class AggregatorRouteTest { + + @EndpointInject(uri = "{{entry}}") + private ProducerTemplate entry; + + @EndpointInject(uri = "{{endpoint}}") + private MockEndpoint endpoint; + + /** + * Test if endpoint receives three separate messages. + * @throws Exception in case of en exception during the test + */ + @Test + @DirtiesContext + public void testSplitter() throws Exception { + + // Three items in one entry message + entry.sendBody("TEST1"); + entry.sendBody("TEST2"); + entry.sendBody("TEST3"); + entry.sendBody("TEST4"); + entry.sendBody("TEST5"); + + // Endpoint should have three different messages in the end order of the messages is not important + endpoint.expectedMessageCount(2); + endpoint.assertIsSatisfied(); + + String body = (String) endpoint.getReceivedExchanges().get(0).getIn().getBody(); + assertEquals(3, body.split(";").length); + + String body2 = (String) endpoint.getReceivedExchanges().get(1).getIn().getBody(); + assertEquals(2, body2.split(";").length); + } +} diff --git a/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java new file mode 100644 index 000000000..f4685e334 --- /dev/null +++ b/eip-aggregator/src/test/java/com/iluwatar/eip/aggregator/routes/MessageAggregationStrategyTest.java @@ -0,0 +1,42 @@ +package com.iluwatar.eip.aggregator.routes; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests MessageAggregationStrategy + */ +public class MessageAggregationStrategyTest { + + @Test + public void testAggregate() { + MessageAggregationStrategy mas = new MessageAggregationStrategy(); + Exchange oldExchange = new DefaultExchange((CamelContext) null); + oldExchange.getIn().setBody("TEST1"); + + Exchange newExchange = new DefaultExchange((CamelContext) null); + newExchange.getIn().setBody("TEST2"); + + Exchange output = mas.aggregate(oldExchange, newExchange); + String outputBody = (String) output.getIn().getBody(); + assertEquals("TEST1;TEST2", outputBody); + } + + @Test + public void testAggregateOldNull() { + MessageAggregationStrategy mas = new MessageAggregationStrategy(); + + Exchange newExchange = new DefaultExchange((CamelContext) null); + newExchange.getIn().setBody("TEST2"); + + Exchange output = mas.aggregate(null, newExchange); + String outputBody = (String) output.getIn().getBody(); + + assertEquals(newExchange, output); + assertEquals("TEST2", outputBody); + } +} diff --git a/eip-aggregator/src/test/resources/application-test.properties b/eip-aggregator/src/test/resources/application-test.properties new file mode 100644 index 000000000..f657ea5a1 --- /dev/null +++ b/eip-aggregator/src/test/resources/application-test.properties @@ -0,0 +1,2 @@ +entry=direct:entry +endpoint=mock:endpoint diff --git a/pom.xml b/pom.xml index 0c03909de..2d84d5064 100644 --- a/pom.xml +++ b/pom.xml @@ -149,6 +149,7 @@ partial-response eip-wire-tap eip-splitter + eip-aggregator From c45e9a1faf24dd58929b957b7b3e3f4c8ca8de04 Mon Sep 17 00:00:00 2001 From: codinghog Date: Tue, 7 Nov 2017 07:24:31 +0100 Subject: [PATCH 2/3] #114 Fixed checkstyle issue --- .../com/iluwatar/eip/aggregator/routes/AggregatorRoute.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java index 78a0abd42..8e09d4e95 100644 --- a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java @@ -32,6 +32,8 @@ public class AggregatorRoute extends RouteBuilder { @Override public void configure() throws Exception { // Main route - from("{{entry}}").aggregate(constant(true), aggregator).completionSize(3).completionInterval(2000).to("{{endpoint}}"); + from("{{entry}}").aggregate(constant(true), aggregator) + .completionSize(3).completionInterval(2000) + .to("{{endpoint}}"); } } From 87ee97a1126b170b764eca90787ca221161706f8 Mon Sep 17 00:00:00 2001 From: codinghog Date: Tue, 7 Nov 2017 07:37:12 +0100 Subject: [PATCH 3/3] #114 Fixed checkstyle issue --- .../java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java | 1 - 1 file changed, 1 deletion(-) diff --git a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java index 8e09d4e95..5aba0ef27 100644 --- a/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java +++ b/eip-aggregator/src/main/java/com/iluwatar/eip/aggregator/routes/AggregatorRoute.java @@ -1,7 +1,6 @@ package com.iluwatar.eip.aggregator.routes; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.processor.aggregate.AggregationStrategy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;