Aggregations

Overview

Aggregations implement the Aggregator EIP -- they collect multiple exchanges and combine them into one. Use them to batch records before loading, collect results from a split, or group data by a key.

IM pre-registers a set of aggregation strategy beans at startup. Reference them by bean name in the strategyRef attribute of the <aggregate> element.


Pre-Registered Aggregation Beans

Bean Name

Description

aggregateToList

Aggregates exchange bodies into a List

enrichPricingParameterIdAggregationStrategy

Copies a header from new exchange to old exchange to preserve pricing parameter ID

propagateFailureAggregationToListStrategy

Aggregates into a list and propagates exceptions to the new exchange

recordsCountAggregation

Aggregates record counts

groupedBodyAggregationStrategy

Collects all bodies into a List<Object> (Camel)

groupedExchangeAggregationStrategy

Collects all exchanges into a List<Exchange> (Camel)

useLatestAggregationStrategy

Keeps only the latest exchange (Camel)

useOriginalAggregationStrategy

Keeps the original exchange (Camel)

stringAggregationStrategy

Concatenates bodies into a single String (Camel)

zipAggregationStrategy

Aggregates all incoming messages into a ZIP file (Camel)

Disable auto-registration:

integration.aggregations.enabled=false

Examples

Collect records into a batch before loading

The most common pattern -- collect N records from a split and load them as a batch:

XML
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="batchImport">
        <from uri="file:{{inbound.path}}"/>
        <to uri="pfx-csv:unmarshal?delimiter=,"/>
        <split>
            <simple>${body}</simple>
            <aggregate strategyRef="aggregateToList" completionSize="500">
                <correlationExpression>
                    <constant>true</constant>
                </correlationExpression>
                <to uri="pfx-api:loaddata?objectType=P&amp;mapper=productMapper"/>
            </aggregate>
        </split>
        <onCompletion onCompleteOnly="true">
            <to uri="pfx-api:internalCopy?label=Product"/>
        </onCompletion>
    </route>
</routes>

Aggregate with time-based completion

Collect records for up to 5 seconds, then process whatever was gathered:

XML
<aggregate strategyRef="aggregateToList" completionTimeout="5000">
    <correlationExpression>
        <simple>${header.batchKey}</simple>
    </correlationExpression>
    <to uri="pfx-api:loaddata?objectType=DM&amp;dsUniqueName=MyDS&amp;mapper=myMapper"/>
</aggregate>
<onCompletion onCompleteOnly="true">
    <to uri="pfx-api:flush?objectType=DM&amp;dsUniqueName=MyDS"/>
</onCompletion>

Important: Always wrap pfx-api:flush in <onCompletion onCompleteOnly="true">. This ensures flush runs only after the entire route succeeds -- if loading fails mid-way, data won't be flushed in a broken state.

Collect files into a ZIP

Use zipAggregationStrategy to combine multiple CSV exports into a single ZIP:

XML
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="zipExports">
        <from uri="timer://zip?repeatCount=1"/>
        <to uri="pfx-api:fetch?objectType=P&amp;filter=allProducts"/>
        <to uri="pfx-csv:marshal"/>
        <setHeader name="CamelFileName"><constant>products.csv</constant></setHeader>
        <aggregate strategyRef="zipAggregationStrategy" completionSize="1">
            <correlationExpression><constant>zip</constant></correlationExpression>
            <setHeader name="CamelFileName"><constant>export.zip</constant></setHeader>
            <to uri="file:{{outbound.path}}"/>
        </aggregate>
    </route>
</routes>

Keep only the latest value per key (de-duplicate)

XML
<aggregate strategyRef="useLatestAggregationStrategy"
           completionTimeout="1000"
           completionSize="100">
    <correlationExpression>
        <simple>${body[sku]}</simple>
    </correlationExpression>
    <to uri="pfx-api:integrate?objectType=P&amp;mapper=productMapper"/>
</aggregate>

Custom aggregation strategy in Groovy

For logic pre-registered beans don't cover, write a class in classes/ implementing AggregationStrategy:

Groovy
// classes/SumByCategory.groovy
import org.apache.camel.Exchange
import org.apache.camel.AggregationStrategy

class SumByCategory implements AggregationStrategy {

    @Override
    Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        def row = newExchange.in.body as Map
        if (oldExchange == null) {
            newExchange.in.body = [(row.category): (row.amount as BigDecimal)]
            return newExchange
        }
        def sums = oldExchange.in.body as Map
        sums[row.category] = (sums[row.category] ?: 0) + (row.amount as BigDecimal)
        oldExchange.in.body = sums
        return oldExchange
    }
}
XML
<aggregate strategyRef="sumByCategory" completionSize="1000">
    <correlationExpression><constant>true</constant></correlationExpression>
    <to uri="direct:loadCategorySums"/>
</aggregate>

Completion Conditions

An aggregate must have at least one completion condition:

Attribute

Description

completionSize

Complete when N exchanges collected

completionTimeout

Complete after N milliseconds of inactivity

completionInterval

Complete every N milliseconds regardless of count

completionPredicate

Complete when a Camel expression is true


See Also

  • Classes (Groovy) -- Write custom aggregation strategies in Groovy

  • Camel Aggregate EIP -- Full parameter reference