Processing N files at a time - Threads

classic Classic list List threaded Threaded
1 message Options
pkx
Reply | Threaded
Open this post in threaded view
|

Processing N files at a time - Threads

pkx
I have seen the various posts for processing files concurrently, and I have seen the various types of errors that result from it. As others have done I managed to get around the issue by somehow limiting the poll from the file consumer, or using an idempotentConsumer before the threads, in my case a idempotentConsumer seems like a better option.

In my opinion this is a workaround, and I think there is an issue in the file component's creation of exchanges and tracking them with .threads(N) added.
It is certainly creating duplicate exchanges for the same file, regardless of the various locking methods I tried, or the post processing (move | delete)

The problem starts the with file component when you add .threads(N) in the route, even if it is a .threads(1)

from("file://spool?delete=true" + otherOptions).routeId("ConsumeByThreads").threads(N).process(someAsyncProcessor);

The logged error is seen when you attempt do getBody, which triggers reading the file...

If I track the exchange details in my "someAsyncProcessor" using the "CamelFileName" header, I can confirm that the exchange is for a file which was already processed, hence the FileNotFoundException by the time the file content needs to be read.  There is NO data loss, as the files are processed already, but there is a lot of noise in error messages produced.

I have written a test case that confirms the issue, I will see how to get it submitted.

This is on Camel 2.15.3, and I have seen it on 2.15.2 as well, on Windows and OSX

I would love to hear if anyone has a solution I have not considered, or if a bug needs to be tracked for the file component.

Edited error log:
---------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-XXXX
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-XXXX, CamelFileAbsolute=false, CamelFileAbsolutePath=C:\poc\spool\ID-XXXX, CamelFileContentType=null, CamelFileLength=0, CamelFileName=ID-XXXX, CamelFileNameConsumed=ID-XXXX, CamelFileNameOnly=ID-XXXX, CamelFileParent=spool, CamelFilePath=spool\ID-XXXX, CamelFileRelativePath=ID-XXXX, CamelRedelivered=false, CamelRedeliveryCounter=0}
        BodyType            org.apache.camel.component.file.GenericFile
        Body                [Body is file based: GenericFile[ID-XXXX]]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is file based: GenericFile[ID-XXXX]] due java.io.FileNotFoundException: C:\...\poc\spool\ID-XXXX (The system cannot find the file specified)
...
Caused by: java.io.FileNotFoundException: ...\spool\ID-XXXXX (The system cannot find the file specified)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:131)
        at org.apache.camel.converter.IOConverter.toInputStream(IOConverter.java:78)
        at org.apache.camel.converter.IOConverter.toByteArray(IOConverter.java:266)
        at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1220)
        ... 25 common frames omitted