Possible Bug - Splitter - parallelProcessing()

classic Classic list List threaded Threaded
1 message Options
SP
Reply | Threaded
Open this post in threaded view
|

Possible Bug - Splitter - parallelProcessing()

SP
The camel version I am using is: 2.24.1 but I also tried this with version
2.23.4

I have the following route that I simplified for the purpose of this
report. This simplified route was tested and, in terms of the bug, behaves
the same way as the the original route.

from("timer:some-timer?period=5s")
        //This query select all the rows/values from tableA that are
not in tableB
        .to("sql:select a.fieldA from tableA a left join tableB b on
(a.fieldA=b.fieldB) where fieldB is
null;?dataSource=dataSource&outputType=SelectList")
        .log(LoggingLevel.INFO, "LOG1 - MAIN ROUTE - BEFORE SPLIT")
        .to("direct:split")
        .to("direct:change-datasource");

from("direct:split")
        .split(body()).stopOnException().parallelProcessing()
            .log(LoggingLevel.INFO, "LOG2 - SPLIT ROUTE - BEFORE INSERT")
            .to("sql:insert into tableB (fieldB) values
(:#${body[fieldA]});?dataSource=dataSource")
            .log(LoggingLevel.INFO, "LOG3 - SPLIT ROUTE - AFTER INSERT")
        .end();

from("direct:change-datasource")
        .log(LoggingLevel.INFO, "LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT")
        .bean("changeDataSourceBean","changeDataSource");


*How the route (should) work:*
1 - the route triggers with a timer every 5 seconds
2 - reads a list of values from a tableA, that are not in other tableB
3 - iterate over the list in parallel
   3.1 insert the iterated item in tableB.
4 - change the data source using AbstractRoutingDataSource
<https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jdbc/datasource/lookup/AbstractRoutingDataSource.html>
.

*Current buggy behavior:*
Step 4 is executed before step 3.1. Even though the logs show in the
correct order. The change of data source(step 4) happens before the value
is inserted in tableB (step 3.1).
So even when in tableA is only one value from dataSource1, that value is
inserted in tableB but in the dataSource2.
*This behavior not happen when I remove '.parallelProcessing()'. When
removed the application behaves as expected.*

*Expected behavior:*
Step 4 should be executed ONLY after step 3 fully finished. Reference:
documentation
<https://camel.apache.org/manual/latest/split-eip.html>of the splitter
states:

parallelProcessing

If enabled then processing each splitted messages occurs concurrently.*
Note the caller thread will still wait until all messages has been fully
processed, before it continues. Its only processing the sub messages from
the splitter which happens concurrently.*
Below are the logs from a two tests I made. One test using
'.parallelProcessing()' and other without '.parallelProcessing()'. Both
test are executed with a "pool " of 3 dataSources (dataSource1,
dataSource2, dataSource3). Only tableA from dataSource2 has one row. All
the other tables from all dataSources are empty.

*Logs using '.parallelProcessing()':*
Incorrect behavior the value from table from dataSource2 should be
processed only once but it loops with no end:
//dataSource1
2019-10-01 20:50:33.990  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:34.000  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2

*2019-10-01 20:50:38.653  INFO 1408 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
20:50:38.666  INFO 1408 --- [read #2 - Split] route2
            : LOG2 - SPLIT ROUTE - BEFORE INSERT*

*2019-10-01 20:50:40.644  INFO 1408 --- [read #2 - Split] route2
                        : LOG3 - SPLIT ROUTE - AFTER INSERT2019-10-01
20:50:40.648  INFO 1408 --- [er://some-timer] route3
            : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 20:50:44.430  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:44.431  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 20:50:49.246  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:49.246  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2


*2019-10-01 20:50:54.054  INFO 1408 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
20:50:54.059  INFO 1408 --- [read #4 - Split] route2
            : LOG2 - SPLIT ROUTE - BEFORE INSERT2019-10-01 20:50:56.306
 INFO 1408 --- [read #4 - Split] route2                                   :
LOG3 - SPLIT ROUTE - AFTER INSERT*
*2019-10-01 20:50:56.307  INFO 1408 --- [er://some-timer] route3
                        : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 20:50:59.080  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:50:59.081  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 20:51:03.966  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:51:03.967  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2



*2019-10-01 20:51:09.004  INFO 1408 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
20:51:09.006  INFO 1408 --- [read #5 - Split] route2
            : LOG2 - SPLIT ROUTE - BEFORE INSERT2019-10-01 20:51:11.062
 INFO 1408 --- [read #5 - Split] route2                                   :
LOG3 - SPLIT ROUTE - AFTER INSERT2019-10-01 20:51:11.063  INFO 1408 ---
[er://some-timer] route3                                   : LOG4 - CHANGE
DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 20:51:13.816  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:51:13.816  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 20:51:19.041  INFO 1408 --- [er://some-timer] route1
                      : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 20:51:19.042  INFO 1408 --- [er://some-timer] route3
                      : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
...


*Logs not using '.parallelProcessing()':*
//dataSource1
2019-10-01 21:00:10.294  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:10.298  INFO 24400 --- [er://some-timer] route3
                        : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2

*2019-10-01 21:00:15.413  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
21:00:15.419  INFO 24400 --- [er://some-timer] route2
            : LOG2 - SPLIT ROUTE - BEFORE INSERT*

*2019-10-01 21:00:17.559  INFO 24400 --- [er://some-timer] route2
                        : LOG3 - SPLIT ROUTE - AFTER INSERT2019-10-01
21:00:17.561  INFO 24400 --- [er://some-timer] route3
            : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 21:00:20.122  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:20.123  INFO 24400 --- [er://some-timer] route3
                        : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 21:00:25.239  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:25.240  INFO 24400 --- [er://some-timer] route3
                        : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource2

*2019-10-01 21:00:30.052  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT2019-10-01
21:00:30.053  INFO 24400 --- [er://some-timer] route3
            : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT*
//dataSource3
2019-10-01 21:00:35.215  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:35.216  INFO 24400 --- [er://some-timer] route3
                        : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
//dataSource1
2019-10-01 21:00:40.497  INFO 24400 --- [er://some-timer] route1
                        : LOG1 - MAIN ROUTE - BEFORE SPLIT
2019-10-01 21:00:40.498  INFO 24400 --- [er://some-timer] route3
                        : LOG4 - CHANGE DATASOURCE ROUTE - AFTER SPLIT
...

*P.S I am not an native english speak, sorry for possible errors*

Regards,
Santiago.