pfx-event Component

pfx-event Component

Summary: Reference for Pricefx event handling — pull and push event processing, event-to-route mapping, and event configuration.


Overview

The pfx-event component handles events generated by the Pricefx platform. It supports two primary operations:

  • fetch (consumer): Polls the Pricefx server for new events, processes them, and acknowledges them back.

  • sendCustom (producer): Sends custom user-defined events to other Integration Manager instances via the messaging infrastructure.

The component is registered under the scheme pfx-event and belongs to the CORE category. The fetch method creates a scheduled poll consumer; sendCustom creates a producer.

In provisioned IM instances, event processing is configured entirely through application properties. The PerEventTypeEventRoute bean auto-configures separate input/output route pairs for each event type based on the integration.events.event-to-route-mapping.* properties.

URI Format

pfx-event:method[?options]

Supported Methods

Method

Direction

Description

fetch

Consumer

Poll Pricefx for unprocessed events and route them

sendCustom

Producer

Send a custom event to the messaging infrastructure

Endpoint Parameters

Core Parameters

Parameter

Type

Default

Required

Description

method

String


Yes

API method: fetch or sendCustom

connection

String

pricefx

No

Pricefx connection name (must be a bean in the Spring context)

eventTypes

String


Yes (fetch)

Comma-separated list of event types to fetch (e.g., PADATALOAD_COMPLETED,CALCULATION_COMPLETED_CFS)

maxEvents

Integer

1000

No

Maximum number of events downloaded from the server in one poll

jsonParser

String

gson

No

JSON parser for event messages. Allowed values: gson, jackson

maxBackoffDelay

long

300000

No

Maximum delay in milliseconds between poll attempts when backoff is active (default: 5 minutes). Since 8.0.0

eventType

String


Yes (sendCustom)

Event type for the custom event to send. Must be in UPPER_SNAKE_CASE (e.g., MY_DISTRIBUTION_EVENT)

inputSource

String


No

Source of data in the exchange: header, property, or body

inputSourceName

String


No

Name of the header or property when inputSource is header or property

Scheduler Parameters (fetch only)

The fetch consumer extends ScheduledPollEndpoint and inherits all scheduled polling parameters:

Parameter

Type

Default

Description

initialDelay

Integer

1000

Milliseconds before the first poll starts

delay

Integer

500

Milliseconds between polls

useFixedDelay

boolean

true

Use fixed delay (true) vs. fixed rate (false)

greedy

boolean

false

Run immediately again if previous poll found messages

startScheduler

boolean

true

Auto-start the scheduler

timeUnit

TimeUnit

MILLISECONDS

Time unit for initialDelay and delay

sendEmptyMessageWhenIdle

boolean

false

Send an empty message when no events are polled

backoffMultiplier

Integer


Number of polls to skip after consecutive idles/errors

backoffErrorThreshold

Integer


Error count before backoff kicks in

backoffIdleThreshold

Integer


Idle count before backoff kicks in

runLoggingLevel

LoggingLevel

TRACE

Logging level for poll start/complete log lines

synchronous

boolean

false

Force synchronous processing only

bridgeErrorHandler

boolean

false

Bridge consumer exceptions to the Camel routing Error Handler

exchangePattern

ExchangePattern


Exchange pattern for consumer-created exchanges

System Properties

Property

Default

Description

integration.pfx-event.cache-folder

(required)

Directory where fetched events are cached as files before processing

integration.pfx-event.archive-folder

(optional)

Directory where processed events are archived

Event Validation

The component validates that each event type is only used by one fetch consumer in the Camel context. If the same event type appears in multiple pfx-event:fetch endpoints, an IllegalArgumentException is thrown at startup:

Event type PADATALOAD_COMPLETED is already used by another route with pfx-event component

Fetch Processing Flow

When the fetch consumer polls:

  1. Retry events are processed first -- any events stored locally from previous failed processing attempts.

  2. New events are fetched from the Pricefx server using the EVT API with a filter for unprocessed events matching the configured event types, sorted by ID.

  3. For each event:

    • An exchange is created with the parsed event message as the body.

    • Headers event.typedId and event.name are set.

    • The downstream processor processes the exchange.

    • On success: the event is acknowledged (httpResponse=200, retriesLeft=0) and optionally archived.

    • On failure: the event is acknowledged as failed (httpResponse=0, retriesLeft=0) and stored to an error directory for later inspection.

  4. Events are acknowledged in batches of 100 via the Pricefx massedit API.

SendCustom Processing Flow

When the sendCustom producer processes an exchange:

  1. The payload is extracted from the exchange based on inputSource/inputSourceName (defaults to body).

  2. A user event is created with the configured eventType and the payload.

  3. The event is published to the messaging infrastructure with a routing key based on the instance name.

Event Route Auto-Configuration (PerEventTypeEventRoute)

In provisioned IM instances, the PerEventTypeEventRoute bean provides automatic event routing. It extends EventRoute and creates a separate input/output route pair for each event type defined in event-to-route-mapping.

How It Works

  1. For each event type in the mapping, a dedicated cache subdirectory is created: <cacheFolder>/<eventType>/.

  2. An input route (scheduler-based) fetches events of that specific type from Pricefx and writes them to the cache directory.

  3. An output route (file consumer) reads events from the cache directory, parses the JSON body, determines the target route from the eventTypeToRouteMapping, and forwards via recipientList.

Route Naming

For a bean named events and event type PADATALOAD_COMPLETED:

  • Input route ID: eventsPADATALOAD_COMPLETEDInputEventRoute

  • Output route ID: eventsPADATALOAD_COMPLETEDOutputEventRoute

Custom Event Types

Custom event types use the CUSTOM. prefix in the mapping key. During routing, the prefix is stripped. For example, the mapping key CUSTOM.MY_EVENT matches events with eventType = MY_EVENT.

Cache and Archive Directories

  • Cache folder: resolved from integration.events.cache-folder property.

  • Temporary files during download use the temp-event-download/ subdirectory prefix.

  • Archive folder: when archiveFolder is set, processed files are moved there instead of being deleted.

  • Error folder: failed events are moved to <cacheFolder>/<eventType>/error/ by default.

JSON Parsing

Two JSON parser implementations are available:

  • gson (default) -- uses Google Gson

  • jackson -- uses Jackson ObjectMapper

Configured via the jsonParserName property on the route bean or jsonParser parameter on the endpoint.

Data Extraction

When extractDataFromEvent is true, the route extracts the first element from the data array within the event body, rather than passing the entire event map.

Application Properties Reference

Pull Events (PerEventTypeEventRoute)

integration.events.enabled=true
integration.events.schedulerDelay=5000
integration.events.delay=10000
integration.events.maxEvents=1000
integration.events.connection=pricefx
integration.events.initial-delay=1000
integration.events.event-to-route-mapping.PADATALOAD_COMPLETED=direct:eventPADataLoadCompleted
integration.events.event-to-route-mapping.CALCULATION_COMPLETED_CFS=direct:eventCFSCompleted

Property

Default

Description

integration.events.enabled

true

Enable or disable event processing

integration.events.schedulerDelay

5000

Polling interval for fetching events from Pricefx (ms)

integration.events.delay

10000

File consumer read interval from cache directory (ms)

integration.events.maxEvents

1000

Maximum events fetched per poll

integration.events.connection

pricefx

Name of the Pricefx connection bean

integration.events.initial-delay

1000

Initial delay before the first poll (ms)

integration.events.event-to-route-mapping.<TYPE>


Maps an event type to a Camel endpoint URI

Stuck Events Monitoring

Property

Default

Description

integration.events.check-stuck-processing.enabled

true

Enable stuck event checking

integration.events.check-stuck-processing.scheduled-ms

7200000

Check interval (default: 2 hours)

integration.events.check-stuck-processing.timeout-duration

24

When an event is considered stuck

integration.events.check-stuck-processing.timeout-unit

h

Time unit: d, h, m, s

Common Event Types

Event Type

Description

PADATALOAD_COMPLETED

Price Analyzer data load has completed

CALCULATION_COMPLETED_CFS

CFS calculation has completed

CUSTOM.<name>

Custom user-defined event (prefix stripped during routing)

Usage Examples

Fetch events and route by type

XML
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="eventFetch">
        <from uri="pfx-event:fetch
            ?eventTypes=PADATALOAD_COMPLETED,CALCULATION_COMPLETED_CFS
            &amp;maxEvents=500
            &amp;delay=10000
            &amp;initialDelay=5000"/>
        <choice>
            <when>
                <simple>${body[eventType]} == 'PADATALOAD_COMPLETED'</simple>
                <to uri="direct:handleDataLoad"/>
            </when>
            <when>
                <simple>${body[eventType]} == 'CALCULATION_COMPLETED_CFS'</simple>
                <to uri="direct:handleCalculation"/>
            </when>
        </choice>
    </route>

    <route id="handleDataLoad">
        <from uri="direct:handleDataLoad"/>
        <log message="Data load completed: ${body[data][0][targetName]}"/>
        <to uri="pfx-api:truncate?targetName=DMF.Product"/>
    </route>

    <route id="handleCalculation">
        <from uri="direct:handleCalculation"/>
        <log message="Calculation done: ${body[data][0][targetName]}"/>
        <to uri="pfx-csv:marshal"/>
        <to uri="file:{{outbound.path}}?fileName=results.csv"/>
    </route>
</routes>

Send a custom event

XML
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="sendCustomEvent">
        <from uri="direct:triggerEvent"/>
        <to uri="pfx-event:sendCustom?eventType=MY_DISTRIBUTION_EVENT"/>
    </route>
</routes>

Send event payload from a header

XML
<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="sendFromHeader">
        <from uri="direct:triggerEvent"/>
        <setHeader name="eventPayload">
            <constant>{"status": "done"}</constant>
        </setHeader>
        <to uri="pfx-event:sendCustom?eventType=MY_EVENT&amp;inputSource=header&amp;inputSourceName=eventPayload"/>
    </route>
</routes>

Error Handling

  • Duplicate event type usage across routes raises IllegalArgumentException at startup.

  • Missing integration.pfx-event.cache-folder property raises IllegalArgumentException at startup.

  • Failed event processing stores the event to <cacheFolder>/<eventType>/error/ and acknowledges it as failed on the Pricefx server.

  • Event fetch or acknowledge failures raise NonRecoverableException.

Exponential Backoff

Available since: 8.0.0

When a fetch poll fails (e.g., HTTP 401 banned user, 500 server error), the consumer applies exponential backoff to avoid flooding logs and overloading a failing Pricefx instance.

  • After each consecutive error, the poll delay doubles: min(originalDelay * 2^errors, maxBackoffDelay).

  • On the first successful poll, the delay resets to the original value.

  • The backoff WARN log includes the full exception cause chain and endpoint context (method, eventTypes, connection).

Without backoff, persistent errors cause thousands of identical exceptions per minute. After enough repetitions, HotSpot's -XX:+OmitStackTraceInFastThrow optimization strips the stack trace, producing unhelpful NullPointerException: null.

The maxBackoffDelay parameter (default: 5 minutes) caps the maximum delay. Override per-route:

XML
<from uri="pfx-event:fetch?eventTypes=PADATALOAD_COMPLETED&amp;delay=5000&amp;maxBackoffDelay=60000"/>

EventListener (Push Events)

The EventListener class provides an alternative push-based model using an embedded Jetty HTTP endpoint. Events are POSTed to the listener and written to a cache folder for processing by the output route.

Note: Push events require an exposed HTTP port, which is primarily relevant for non-provisioned setups. In provisioned IM instances, the pull-based PerEventTypeEventRoute approach described above is the standard pattern.

See Also