The camel-jdbc-aggregator component allows together with Camel to provide persistent support for the Aggregator.
JdbcAggregationRepository is an AggregationRepository which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository.
It has the following options:
Mandatory: The javax.sql.DataSource to use for accessing the database.
Mandatory: The name of the repository.
Mandatory: The org.springframework.transaction.PlatformTransactionManager to mange transactions for the database. The TransactionManager must be able to support databases.
A org.springframework.jdbc.support.lob.LobHandler to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle.
Whether the get operation should return the old existing Exchange if any existed. By default this option is false to optimize as we do not need the old exchange when aggregating.
Whether or not recovery is enabled. This option is by default true. When enabled the Camel Aggregator automatic recover failed aggregated exchange and have them resubmitted.
If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis.
Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri option must also be provided.
An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the maximumRedeliveries option must also be provided.
What is preserved when persisting
JdbcAggregationRepository will only preserve any Serializable compatible data types. If a data type is not such a type its dropped and a WARN is logged. And it only persists the Message body and the Message headers. The Exchange properties are not persisted.
The JdbcAggregationRepository will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.
The following headers is set when an Exchange is being recovered/redelivered:
Is set to true to indicate the Exchange is being redelivered.
The redelivery attempt, starting from 1.
Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm method is invoked on the AggregationRepository. This means if the same Exchange fails again it will be kept retried until it success.
You can use option maximumRedeliveries to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri option so Camel knows where to send the Exchange when the maximumRedeliveries was hit.
You can see some examples in the unit tests of camel-jdbc-aggregagor, for example this test.
To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED". The name must be configured in the Spring bean with the RepositoryName property. In the following example aggregation will be used.
The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array.
However one difference should be remembered: the id field does not have the same content depending on the table.
In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.
Here is the SQL query used to create the tables, just replace "aggregation" with your aggregator repository name.
CREATE TABLE aggregation (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
constraint aggregation_pk PRIMARY KEY (id)
CREATE TABLE aggregation_completed (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
constraint aggregation_completed_pk PRIMARY KEY (id)
Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream.
The ClassLoadingAwareObjectInputStream has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream and use it with the ContextClassLoader rather than the currentThread one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.
A Spring PlatformTransactionManager is required to orchestrate transaction.
The start method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.
Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler property.
Here is the declaration for Oracle:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"><property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/></bean><bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/><bean id="repo" class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository"><property name="transactionManager" ref="transactionManager"/><property name="repositoryName" value="aggregation"/><property name="dataSource" ref="dataSource"/><!-- Only with Oracle, else use default --><property name="lobHandler" ref="lobHandler"/></bean>