Need a way to process invalid message among aggregated message

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Need a way to process invalid message among aggregated message

furiabhavesh
Hi,

I am reading records from a CSV file and doing a batch insert into DB.
Here's the sample code:

public class CliaRouteBuilder extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        final DataFormat bindyObj = new BindyCsvDataFormat(Clia.class);
        final String datasource_name = "clia";

        onException(CannotGetJdbcConnectionException.class)
            .maximumRedeliveries(3)
            .redeliveryDelay(2000)
            .useExponentialBackOff();

        from("file:camel/input/"+datasource_name+"/?noop=true")
            .routeId("fileMessageFrom"+datasource_name+"Folder")
            .split(body().tokenize("\n"))
            .streaming()
            .choice()
                .when(body().contains("CITY_NAME"))
                    .log("Ignoring message because a header row is detected
- "+body())
                    .to("direct:trash")
                .otherwise()
                    .to("direct:individual"+datasource_name+"Record");

        from("direct:individual"+datasource_name+"Record")
            .routeId("individual"+datasource_name+"RowRecord")
            .process(new UniqueHashGenerator())
            .idempotentConsumer(header("msgHash"),
getIdempotentRepository(datasource_name))
            .unmarshal(bindyObj)
            .aggregate(constant(true), new CliaAggregator())
                .completionSize(50)
                .completionTimeout(2000)
            .aggregationRepository(getAggregationRepository())
        .to("sql:insert into clia(prvdr_ctgry_sbtyp_cd, prvdr_ctgry_cd)
values (:#category_subtype_code, :#category_code)?batch=true")
            .end();
    }

    private static AggregationRepository getAggregationRepository() {
        SingleConnectionDataSource ds = new
SingleConnectionDataSource(DB_URL, DB_USER, DB_PASS, true);
        DataSourceTransactionManager txManager = new
DataSourceTransactionManager(ds);

        JdbcAggregationRepository repo = new
JdbcAggregationRepository(txManager, "aggregation", ds);
        repo.setUseRecovery(true);
        repo.setMaximumRedeliveries(3);
        repo.setDeadLetterUri("direct:trash");
        repo.setRecoveryInterval(2000);

        return (AggregationRepository) repo;
    }

    private static IdempotentRepository getIdempotentRepository(String
name) {
        Map repoMap = new HashMap<String, Object>();
        File fileStore = new File("camel/idempotent-repos/" + name +
".log");
        return new FileIdempotentRepository(fileStore, repoMap);
    }
}

When I am aggregating multiple rows from a CSV file, it is possible that
for a few records I might get DataIntegrityViolationException because of
some DB constraints.
There is no use in retrying those records since it is a data issue and not
network / connection failure.

Since the exception is being set at Exchange level, is there a way to
identify the erroneous record from the aggregated message and process the
remaining rows ?
Or any other strategy which could be helpful to mitigate this type of issue
?

Regards,
Bhavesh Furia