pfx-event Component
Summary: Reference for Pricefx event handling — pull and push event processing, event-to-route mapping, and event configuration.
Overview
The pfx-event component handles events generated by the Pricefx platform. It supports two primary operations:
-
fetch(consumer): Polls the Pricefx server for new events, processes them, and acknowledges them back. -
sendCustom(producer): Sends custom user-defined events to other Integration Manager instances via the messaging infrastructure.
The component is registered under the scheme pfx-event and belongs to the CORE category. The fetch method creates a scheduled poll consumer; sendCustom creates a producer.
In provisioned IM instances, event processing is configured entirely through application properties. The PerEventTypeEventRoute bean auto-configures separate input/output route pairs for each event type based on the integration.events.event-to-route-mapping.* properties.
URI Format
pfx-event:method[?options]
Supported Methods
|
Method |
Direction |
Description |
|---|---|---|
|
|
Consumer |
Poll Pricefx for unprocessed events and route them |
|
|
Producer |
Send a custom event to the messaging infrastructure |
Endpoint Parameters
Core Parameters
|
Parameter |
Type |
Default |
Required |
Description |
|---|---|---|---|---|
|
|
|
|
Yes |
API method: |
|
|
|
|
No |
Pricefx connection name (must be a bean in the Spring context) |
|
|
|
|
Yes (fetch) |
Comma-separated list of event types to fetch (e.g., |
|
|
|
|
No |
Maximum number of events downloaded from the server in one poll |
|
|
|
|
No |
JSON parser for event messages. Allowed values: |
|
|
|
|
No |
Maximum delay in milliseconds between poll attempts when backoff is active (default: 5 minutes). Since 8.0.0 |
|
|
|
|
Yes (sendCustom) |
Event type for the custom event to send. Must be in UPPER_SNAKE_CASE (e.g., |
|
|
|
|
No |
Source of data in the exchange: |
|
|
|
|
No |
Name of the header or property when |
Scheduler Parameters (fetch only)
The fetch consumer extends ScheduledPollEndpoint and inherits all scheduled polling parameters:
|
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
|
Milliseconds before the first poll starts |
|
|
|
|
Milliseconds between polls |
|
|
|
|
Use fixed delay (true) vs. fixed rate (false) |
|
|
|
|
Run immediately again if previous poll found messages |
|
|
|
|
Auto-start the scheduler |
|
|
|
|
Time unit for |
|
|
|
|
Send an empty message when no events are polled |
|
|
|
|
Number of polls to skip after consecutive idles/errors |
|
|
|
|
Error count before backoff kicks in |
|
|
|
|
Idle count before backoff kicks in |
|
|
|
|
Logging level for poll start/complete log lines |
|
|
|
|
Force synchronous processing only |
|
|
|
|
Bridge consumer exceptions to the Camel routing Error Handler |
|
|
|
|
Exchange pattern for consumer-created exchanges |
System Properties
|
Property |
Default |
Description |
|---|---|---|
|
|
(required) |
Directory where fetched events are cached as files before processing |
|
|
(optional) |
Directory where processed events are archived |
Event Validation
The component validates that each event type is only used by one fetch consumer in the Camel context. If the same event type appears in multiple pfx-event:fetch endpoints, an IllegalArgumentException is thrown at startup:
Event type PADATALOAD_COMPLETED is already used by another route with pfx-event component
Fetch Processing Flow
When the fetch consumer polls:
-
Retry events are processed first -- any events stored locally from previous failed processing attempts.
-
New events are fetched from the Pricefx server using the EVT API with a filter for unprocessed events matching the configured event types, sorted by ID.
-
For each event:
-
An exchange is created with the parsed event message as the body.
-
Headers
event.typedIdandevent.nameare set. -
The downstream processor processes the exchange.
-
On success: the event is acknowledged (httpResponse=200, retriesLeft=0) and optionally archived.
-
On failure: the event is acknowledged as failed (httpResponse=0, retriesLeft=0) and stored to an error directory for later inspection.
-
-
Events are acknowledged in batches of 100 via the Pricefx massedit API.
SendCustom Processing Flow
When the sendCustom producer processes an exchange:
-
The payload is extracted from the exchange based on
inputSource/inputSourceName(defaults to body). -
A user event is created with the configured
eventTypeand the payload. -
The event is published to the messaging infrastructure with a routing key based on the instance name.
Event Route Auto-Configuration (PerEventTypeEventRoute)
In provisioned IM instances, the PerEventTypeEventRoute bean provides automatic event routing. It extends EventRoute and creates a separate input/output route pair for each event type defined in event-to-route-mapping.
How It Works
-
For each event type in the mapping, a dedicated cache subdirectory is created:
<cacheFolder>/<eventType>/. -
An input route (scheduler-based) fetches events of that specific type from Pricefx and writes them to the cache directory.
-
An output route (file consumer) reads events from the cache directory, parses the JSON body, determines the target route from the
eventTypeToRouteMapping, and forwards viarecipientList.
Route Naming
For a bean named events and event type PADATALOAD_COMPLETED:
-
Input route ID:
eventsPADATALOAD_COMPLETEDInputEventRoute -
Output route ID:
eventsPADATALOAD_COMPLETEDOutputEventRoute
Custom Event Types
Custom event types use the CUSTOM. prefix in the mapping key. During routing, the prefix is stripped. For example, the mapping key CUSTOM.MY_EVENT matches events with eventType = MY_EVENT.
Cache and Archive Directories
-
Cache folder: resolved from
integration.events.cache-folderproperty. -
Temporary files during download use the
temp-event-download/subdirectory prefix. -
Archive folder: when
archiveFolderis set, processed files are moved there instead of being deleted. -
Error folder: failed events are moved to
<cacheFolder>/<eventType>/error/by default.
JSON Parsing
Two JSON parser implementations are available:
-
gson(default) -- uses Google Gson -
jackson-- uses Jackson ObjectMapper
Configured via the jsonParserName property on the route bean or jsonParser parameter on the endpoint.
Data Extraction
When extractDataFromEvent is true, the route extracts the first element from the data array within the event body, rather than passing the entire event map.
Application Properties Reference
Pull Events (PerEventTypeEventRoute)
integration.events.enabled=true
integration.events.schedulerDelay=5000
integration.events.delay=10000
integration.events.maxEvents=1000
integration.events.connection=pricefx
integration.events.initial-delay=1000
integration.events.event-to-route-mapping.PADATALOAD_COMPLETED=direct:eventPADataLoadCompleted
integration.events.event-to-route-mapping.CALCULATION_COMPLETED_CFS=direct:eventCFSCompleted
|
Property |
Default |
Description |
|---|---|---|
|
|
|
Enable or disable event processing |
|
|
|
Polling interval for fetching events from Pricefx (ms) |
|
|
|
File consumer read interval from cache directory (ms) |
|
|
|
Maximum events fetched per poll |
|
|
|
Name of the Pricefx connection bean |
|
|
|
Initial delay before the first poll (ms) |
|
|
|
Maps an event type to a Camel endpoint URI |
Stuck Events Monitoring
|
Property |
Default |
Description |
|---|---|---|
|
|
|
Enable stuck event checking |
|
|
|
Check interval (default: 2 hours) |
|
|
|
When an event is considered stuck |
|
|
|
Time unit: |
Common Event Types
|
Event Type |
Description |
|---|---|
|
|
Price Analyzer data load has completed |
|
|
CFS calculation has completed |
|
|
Custom user-defined event (prefix stripped during routing) |
Usage Examples
Fetch events and route by type
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="eventFetch">
<from uri="pfx-event:fetch
?eventTypes=PADATALOAD_COMPLETED,CALCULATION_COMPLETED_CFS
&maxEvents=500
&delay=10000
&initialDelay=5000"/>
<choice>
<when>
<simple>${body[eventType]} == 'PADATALOAD_COMPLETED'</simple>
<to uri="direct:handleDataLoad"/>
</when>
<when>
<simple>${body[eventType]} == 'CALCULATION_COMPLETED_CFS'</simple>
<to uri="direct:handleCalculation"/>
</when>
</choice>
</route>
<route id="handleDataLoad">
<from uri="direct:handleDataLoad"/>
<log message="Data load completed: ${body[data][0][targetName]}"/>
<to uri="pfx-api:truncate?targetName=DMF.Product"/>
</route>
<route id="handleCalculation">
<from uri="direct:handleCalculation"/>
<log message="Calculation done: ${body[data][0][targetName]}"/>
<to uri="pfx-csv:marshal"/>
<to uri="file:{{outbound.path}}?fileName=results.csv"/>
</route>
</routes>
Send a custom event
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="sendCustomEvent">
<from uri="direct:triggerEvent"/>
<to uri="pfx-event:sendCustom?eventType=MY_DISTRIBUTION_EVENT"/>
</route>
</routes>
Send event payload from a header
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="sendFromHeader">
<from uri="direct:triggerEvent"/>
<setHeader name="eventPayload">
<constant>{"status": "done"}</constant>
</setHeader>
<to uri="pfx-event:sendCustom?eventType=MY_EVENT&inputSource=header&inputSourceName=eventPayload"/>
</route>
</routes>
Error Handling
-
Duplicate event type usage across routes raises
IllegalArgumentExceptionat startup. -
Missing
integration.pfx-event.cache-folderproperty raisesIllegalArgumentExceptionat startup. -
Failed event processing stores the event to
<cacheFolder>/<eventType>/error/and acknowledges it as failed on the Pricefx server. -
Event fetch or acknowledge failures raise
NonRecoverableException.
Exponential Backoff
Available since: 8.0.0
When a fetch poll fails (e.g., HTTP 401 banned user, 500 server error), the consumer applies exponential backoff to avoid flooding logs and overloading a failing Pricefx instance.
-
After each consecutive error, the poll delay doubles:
min(originalDelay * 2^errors, maxBackoffDelay). -
On the first successful poll, the delay resets to the original value.
-
The backoff WARN log includes the full exception cause chain and endpoint context (method, eventTypes, connection).
Without backoff, persistent errors cause thousands of identical exceptions per minute. After enough repetitions, HotSpot's -XX:+OmitStackTraceInFastThrow optimization strips the stack trace, producing unhelpful NullPointerException: null.
The maxBackoffDelay parameter (default: 5 minutes) caps the maximum delay. Override per-route:
<from uri="pfx-event:fetch?eventTypes=PADATALOAD_COMPLETED&delay=5000&maxBackoffDelay=60000"/>
EventListener (Push Events)
The EventListener class provides an alternative push-based model using an embedded Jetty HTTP endpoint. Events are POSTed to the listener and written to a cache folder for processing by the output route.
Note: Push events require an exposed HTTP port, which is primarily relevant for non-provisioned setups. In provisioned IM instances, the pull-based PerEventTypeEventRoute approach described above is the standard pattern.
See Also
-
Routes -- route configuration and the Events section
-
Properties -- application property reference
-
Pricefx Events Documentation -- Confluence page