#114 Aggregator pattern; tests; description
This commit is contained in:
parent
a2a13758e0
commit
6e0bf59e5a
33
eip-aggregator/README.md
Normal file
33
eip-aggregator/README.md
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
---
|
||||||
|
layout: pattern
|
||||||
|
title: EIP Aggregator
|
||||||
|
folder: eip-aggregator
|
||||||
|
permalink: /patterns/eip-aggregator/
|
||||||
|
categories: Enterprise integration
|
||||||
|
tags:
|
||||||
|
- Java
|
||||||
|
- Difficulty-Intermittent
|
||||||
|
- Enterprise integration
|
||||||
|
---
|
||||||
|
|
||||||
|
## Intent
|
||||||
|
Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example
|
||||||
|
you may need to gather offers and after defined number of offers has been received you would like to choose the one with
|
||||||
|
the best parameters.
|
||||||
|
|
||||||
|
Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages,
|
||||||
|
applies aggregation strategy and upon fulfilling given criteria, releasing merged messages.
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
## Applicability
|
||||||
|
Use the Aggregator pattern when
|
||||||
|
|
||||||
|
* You need to combine multiple incoming messages
|
||||||
|
* You want to process grouped data
|
||||||
|
|
||||||
|
## Credits
|
||||||
|
|
||||||
|
* [Gregor Hohpe, Bobby Woolf - Enterprise Integration Patterns](http://www.enterpriseintegrationpatterns.com/patterns/messaging/Aggregator.html)
|
||||||
|
* [Apache Camel - Documentation](http://camel.apache.org/aggregator2.html)
|
||||||
|
|
BIN
eip-aggregator/etc/aggregator.gif
Normal file
BIN
eip-aggregator/etc/aggregator.gif
Normal file
Binary file not shown.
After Width: | Height: | Size: 2.4 KiB |
63
eip-aggregator/pom.xml
Normal file
63
eip-aggregator/pom.xml
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<!--
|
||||||
|
The MIT License
|
||||||
|
Copyright (c) 2014-2016 Ilkka Seppälä
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
The above copyright notice and this permission notice shall be included in
|
||||||
|
all copies or substantial portions of the Software.
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||||
|
THE SOFTWARE.
|
||||||
|
-->
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<artifactId>eip-aggregator</artifactId>
|
||||||
|
<parent>
|
||||||
|
<groupId>com.iluwatar</groupId>
|
||||||
|
<artifactId>java-design-patterns</artifactId>
|
||||||
|
<version>1.18.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-web</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel</groupId>
|
||||||
|
<artifactId>camel-core</artifactId>
|
||||||
|
<version>${camel.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel</groupId>
|
||||||
|
<artifactId>camel-spring-boot</artifactId>
|
||||||
|
<version>${camel.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- Testing -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.camel</groupId>
|
||||||
|
<artifactId>camel-test-spring</artifactId>
|
||||||
|
<version>${camel.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
@ -0,0 +1,51 @@
|
|||||||
|
package com.iluwatar.eip.aggregator;
|
||||||
|
|
||||||
|
import org.apache.camel.CamelContext;
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
import org.springframework.boot.SpringApplication;
|
||||||
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||||
|
import org.springframework.context.ConfigurableApplicationContext;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sometimes in enterprise systems there is a need to group incoming data in order to process it as a whole. For example
|
||||||
|
* you may need to gather offers and after defined number of offers has been received you would like to choose the one
|
||||||
|
* with the best parameters.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Aggregator allows you to merge messages based on defined criteria and parameters. It gathers original messages,
|
||||||
|
* applies aggregation strategy and upon fulfilling given criteria, releasing merged messages.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@SpringBootApplication
|
||||||
|
public class App {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Program entry point. It starts Spring Boot application and using Apache Camel it auto-configures routes.
|
||||||
|
*
|
||||||
|
* @param args command line args
|
||||||
|
*/
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
// Run Spring Boot application and obtain ApplicationContext
|
||||||
|
ConfigurableApplicationContext context = SpringApplication.run(App.class, args);
|
||||||
|
|
||||||
|
// Get CamelContext from ApplicationContext
|
||||||
|
CamelContext camelContext = (CamelContext) context.getBean("camelContext");
|
||||||
|
|
||||||
|
// Add a new routes that will handle endpoints form SplitterRoute class.
|
||||||
|
camelContext.addRoutes(new RouteBuilder() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure() throws Exception {
|
||||||
|
from("{{endpoint}}").log("ENDPOINT: ${body}");
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
|
||||||
|
// Add producer that will send test message to an entry point in WireTapRoute
|
||||||
|
String[] stringArray = {"Test item #1", "Test item #2", "Test item #3"};
|
||||||
|
camelContext.createProducerTemplate().sendBody("{{entry}}", stringArray);
|
||||||
|
|
||||||
|
SpringApplication.exit(context);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
package com.iluwatar.eip.aggregator.routes;
|
||||||
|
|
||||||
|
import org.apache.camel.builder.RouteBuilder;
|
||||||
|
import org.apache.camel.processor.aggregate.AggregationStrategy;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sample aggregator route definition.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* It consumes messages out of the <i>direct:entry</i> entry point and forwards them to <i>direct:endpoint</i>.
|
||||||
|
* Route accepts messages containing String as a body, it aggregates the messages based on the settings and forwards
|
||||||
|
* them as CSV to the output chanel.
|
||||||
|
*
|
||||||
|
* Settings for the aggregation are: aggregate until 3 messages are bundled or wait 2000ms before sending bundled
|
||||||
|
* messages further.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* In this example input/output endpoints names are stored in <i>application.properties</i> file.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class AggregatorRoute extends RouteBuilder {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private MessageAggregationStrategy aggregator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configures the route
|
||||||
|
* @throws Exception in case of exception during configuration
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void configure() throws Exception {
|
||||||
|
// Main route
|
||||||
|
from("{{entry}}").aggregate(constant(true), aggregator).completionSize(3).completionInterval(2000).to("{{endpoint}}");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
package com.iluwatar.eip.aggregator.routes;
|
||||||
|
|
||||||
|
import org.apache.camel.Exchange;
|
||||||
|
import org.apache.camel.processor.aggregate.AggregationStrategy;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Aggregation strategy joining bodies of messages. If message is first one <i>oldMessage</i> is null. All changes are
|
||||||
|
* made on IN messages.
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class MessageAggregationStrategy implements AggregationStrategy {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
|
||||||
|
if (oldExchange == null) {
|
||||||
|
return newExchange;
|
||||||
|
}
|
||||||
|
|
||||||
|
String in1 = (String) oldExchange.getIn().getBody();
|
||||||
|
String in2 = (String) newExchange.getIn().getBody();
|
||||||
|
|
||||||
|
oldExchange.getIn().setBody(in1 + ";" + in2);
|
||||||
|
|
||||||
|
return oldExchange;
|
||||||
|
}
|
||||||
|
}
|
2
eip-aggregator/src/main/resources/application.properties
Normal file
2
eip-aggregator/src/main/resources/application.properties
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
entry=direct:entry
|
||||||
|
endpoint=direct:endpoint
|
@ -0,0 +1,15 @@
|
|||||||
|
package com.iluwatar.eip.aggregator;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for App class
|
||||||
|
*/
|
||||||
|
public class AppTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMain() throws Exception {
|
||||||
|
String[] args = {};
|
||||||
|
App.main(args);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
package com.iluwatar.eip.aggregator.routes;
|
||||||
|
|
||||||
|
import org.apache.camel.EndpointInject;
|
||||||
|
import org.apache.camel.ProducerTemplate;
|
||||||
|
import org.apache.camel.component.mock.MockEndpoint;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||||
|
import org.springframework.boot.test.SpringApplicationConfiguration;
|
||||||
|
import org.springframework.context.annotation.ComponentScan;
|
||||||
|
import org.springframework.test.annotation.DirtiesContext;
|
||||||
|
import org.springframework.test.context.ActiveProfiles;
|
||||||
|
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for <i>AggregatorRoute</i>.
|
||||||
|
* <p>
|
||||||
|
* In order for it to work we have to mock endpoints we want to read/write to. To mock those we need to substitute
|
||||||
|
* original endpoint names to mocks.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
@RunWith(SpringJUnit4ClassRunner.class)
|
||||||
|
@SpringApplicationConfiguration(classes = AggregatorRouteTest.class)
|
||||||
|
@ActiveProfiles("test")
|
||||||
|
@EnableAutoConfiguration
|
||||||
|
@ComponentScan
|
||||||
|
public class AggregatorRouteTest {
|
||||||
|
|
||||||
|
@EndpointInject(uri = "{{entry}}")
|
||||||
|
private ProducerTemplate entry;
|
||||||
|
|
||||||
|
@EndpointInject(uri = "{{endpoint}}")
|
||||||
|
private MockEndpoint endpoint;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test if endpoint receives three separate messages.
|
||||||
|
* @throws Exception in case of en exception during the test
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@DirtiesContext
|
||||||
|
public void testSplitter() throws Exception {
|
||||||
|
|
||||||
|
// Three items in one entry message
|
||||||
|
entry.sendBody("TEST1");
|
||||||
|
entry.sendBody("TEST2");
|
||||||
|
entry.sendBody("TEST3");
|
||||||
|
entry.sendBody("TEST4");
|
||||||
|
entry.sendBody("TEST5");
|
||||||
|
|
||||||
|
// Endpoint should have three different messages in the end order of the messages is not important
|
||||||
|
endpoint.expectedMessageCount(2);
|
||||||
|
endpoint.assertIsSatisfied();
|
||||||
|
|
||||||
|
String body = (String) endpoint.getReceivedExchanges().get(0).getIn().getBody();
|
||||||
|
assertEquals(3, body.split(";").length);
|
||||||
|
|
||||||
|
String body2 = (String) endpoint.getReceivedExchanges().get(1).getIn().getBody();
|
||||||
|
assertEquals(2, body2.split(";").length);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,42 @@
|
|||||||
|
package com.iluwatar.eip.aggregator.routes;
|
||||||
|
|
||||||
|
import org.apache.camel.CamelContext;
|
||||||
|
import org.apache.camel.Exchange;
|
||||||
|
import org.apache.camel.impl.DefaultExchange;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests MessageAggregationStrategy
|
||||||
|
*/
|
||||||
|
public class MessageAggregationStrategyTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAggregate() {
|
||||||
|
MessageAggregationStrategy mas = new MessageAggregationStrategy();
|
||||||
|
Exchange oldExchange = new DefaultExchange((CamelContext) null);
|
||||||
|
oldExchange.getIn().setBody("TEST1");
|
||||||
|
|
||||||
|
Exchange newExchange = new DefaultExchange((CamelContext) null);
|
||||||
|
newExchange.getIn().setBody("TEST2");
|
||||||
|
|
||||||
|
Exchange output = mas.aggregate(oldExchange, newExchange);
|
||||||
|
String outputBody = (String) output.getIn().getBody();
|
||||||
|
assertEquals("TEST1;TEST2", outputBody);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAggregateOldNull() {
|
||||||
|
MessageAggregationStrategy mas = new MessageAggregationStrategy();
|
||||||
|
|
||||||
|
Exchange newExchange = new DefaultExchange((CamelContext) null);
|
||||||
|
newExchange.getIn().setBody("TEST2");
|
||||||
|
|
||||||
|
Exchange output = mas.aggregate(null, newExchange);
|
||||||
|
String outputBody = (String) output.getIn().getBody();
|
||||||
|
|
||||||
|
assertEquals(newExchange, output);
|
||||||
|
assertEquals("TEST2", outputBody);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,2 @@
|
|||||||
|
entry=direct:entry
|
||||||
|
endpoint=mock:endpoint
|
1
pom.xml
1
pom.xml
@ -149,6 +149,7 @@
|
|||||||
<module>partial-response</module>
|
<module>partial-response</module>
|
||||||
<module>eip-wire-tap</module>
|
<module>eip-wire-tap</module>
|
||||||
<module>eip-splitter</module>
|
<module>eip-splitter</module>
|
||||||
|
<module>eip-aggregator</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
<dependencyManagement>
|
<dependencyManagement>
|
||||||
|
Loading…
x
Reference in New Issue
Block a user