Overview
Aggregations implement the Aggregator EIP -- they collect multiple exchanges and combine them into one. Use them to batch records before loading, collect results from a split, or group data by a key.
IM pre-registers a set of aggregation strategy beans at startup. Reference them by bean name in the strategyRef attribute of the <aggregate> element.
Pre-Registered Aggregation Beans
|
Bean Name |
Description |
|---|---|
|
|
Aggregates exchange bodies into a |
|
|
Copies a header from new exchange to old exchange to preserve pricing parameter ID |
|
|
Aggregates into a list and propagates exceptions to the new exchange |
|
|
Aggregates record counts |
|
|
Collects all bodies into a |
|
|
Collects all exchanges into a |
|
|
Keeps only the latest exchange (Camel) |
|
|
Keeps the original exchange (Camel) |
|
|
Concatenates bodies into a single String (Camel) |
|
|
Aggregates all incoming messages into a ZIP file (Camel) |
Disable auto-registration:
integration.aggregations.enabled=false
Examples
Collect records into a batch before loading
The most common pattern -- collect N records from a split and load them as a batch:
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="batchImport">
<from uri="file:{{inbound.path}}"/>
<to uri="pfx-csv:unmarshal?delimiter=,"/>
<split>
<simple>${body}</simple>
<aggregate strategyRef="aggregateToList" completionSize="500">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<to uri="pfx-api:loaddata?objectType=P&mapper=productMapper"/>
</aggregate>
</split>
<onCompletion onCompleteOnly="true">
<to uri="pfx-api:internalCopy?label=Product"/>
</onCompletion>
</route>
</routes>
Aggregate with time-based completion
Collect records for up to 5 seconds, then process whatever was gathered:
<aggregate strategyRef="aggregateToList" completionTimeout="5000">
<correlationExpression>
<simple>${header.batchKey}</simple>
</correlationExpression>
<to uri="pfx-api:loaddata?objectType=DM&dsUniqueName=MyDS&mapper=myMapper"/>
</aggregate>
<onCompletion onCompleteOnly="true">
<to uri="pfx-api:flush?objectType=DM&dsUniqueName=MyDS"/>
</onCompletion>
Important: Always wrap
pfx-api:flushin<onCompletion onCompleteOnly="true">. This ensures flush runs only after the entire route succeeds -- if loading fails mid-way, data won't be flushed in a broken state.
Collect files into a ZIP
Use zipAggregationStrategy to combine multiple CSV exports into a single ZIP:
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="zipExports">
<from uri="timer://zip?repeatCount=1"/>
<to uri="pfx-api:fetch?objectType=P&filter=allProducts"/>
<to uri="pfx-csv:marshal"/>
<setHeader name="CamelFileName"><constant>products.csv</constant></setHeader>
<aggregate strategyRef="zipAggregationStrategy" completionSize="1">
<correlationExpression><constant>zip</constant></correlationExpression>
<setHeader name="CamelFileName"><constant>export.zip</constant></setHeader>
<to uri="file:{{outbound.path}}"/>
</aggregate>
</route>
</routes>
Keep only the latest value per key (de-duplicate)
<aggregate strategyRef="useLatestAggregationStrategy"
completionTimeout="1000"
completionSize="100">
<correlationExpression>
<simple>${body[sku]}</simple>
</correlationExpression>
<to uri="pfx-api:integrate?objectType=P&mapper=productMapper"/>
</aggregate>
Custom aggregation strategy in Groovy
For logic pre-registered beans don't cover, write a class in classes/ implementing AggregationStrategy:
// classes/SumByCategory.groovy
import org.apache.camel.Exchange
import org.apache.camel.AggregationStrategy
class SumByCategory implements AggregationStrategy {
@Override
Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
def row = newExchange.in.body as Map
if (oldExchange == null) {
newExchange.in.body = [(row.category): (row.amount as BigDecimal)]
return newExchange
}
def sums = oldExchange.in.body as Map
sums[row.category] = (sums[row.category] ?: 0) + (row.amount as BigDecimal)
oldExchange.in.body = sums
return oldExchange
}
}
<aggregate strategyRef="sumByCategory" completionSize="1000">
<correlationExpression><constant>true</constant></correlationExpression>
<to uri="direct:loadCategorySums"/>
</aggregate>
Completion Conditions
An aggregate must have at least one completion condition:
|
Attribute |
Description |
|---|---|
|
|
Complete when N exchanges collected |
|
|
Complete after N milliseconds of inactivity |
|
|
Complete every N milliseconds regardless of count |
|
|
Complete when a Camel expression is true |
See Also
-
Classes (Groovy) -- Write custom aggregation strategies in Groovy
-
Camel Aggregate EIP -- Full parameter reference