Data Corruption in SFTP in Parallel Multicast branches

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Data Corruption in SFTP in Parallel Multicast branches

lakshmi.prashant
This post was updated on .
Hi,

 • In case if there are SFTP receivers at the end of the branch(es) of Parallel Multicast, the payload gets corrupted in 1 or more branches.
     o  Any 1 branch / Some SFTP receivers may not receive the full data in the respective SFTP file(s).

 
While validating it with a simple example (SFTP->parallel multicast->2*SFTP in 2 different branches): A test with a large payload showed that both receiver files were created but as soon as one is finished the other one stops too. This already indicates that there are no two different streams that are processed but just one stream that is handed to the SFTP endpoints.


Even the trace log showed that if the body of the Main route (before Multicast) is a StreamCache , the same was used by the SFTP receivers / endpoints in both the branches:

Debug Trace:
2015 01 13 08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel (Test_Multicast) thread #7 - Multicast##avtcl#itjkmerct1#ifl##About to store file: rec2 using stream: org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|
2015 01 13 08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel (Test_Multicast) thread #6 - Multicast##avtcl#itjkmerct1#ifl##About to store file: rec1 using stream: org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|


Both use the same stream instance.
MulticastProcessor.createProcessorExchangePairs seems to create the copies. The copying is based on Exchange.copy and Message.copy. Message.copy does not copy the streams which is required for the mulicast to work properly with streams.

We could circumvent the issue, if we convert the payload / body  from cache outputstream to byte[] before the multicast  or just before the SFTP endpoints in the multicast branches (i.e. using  ${in.bodyAs(byte[])}  within camel:simple ).

MessageSupport.java:

public void copyFrom(Message that) {
        if (that == this) {
            // the same instance so do not need to copy
            return;
        }

        setMessageId(that.getMessageId());
        setBody(that.getBody());   // Here, the body is not cloned for the Message copy
        setFault(that.isFault());
....
}
}

This problem does not come in Sequential multicast.

Can you please let us knoiw if this is an issue 9or) already fixed?

Thanks,
Lakshmi


Reply | Threaded
Open this post in threaded view
|

RE: Data Corruption in SFTP in Parallel Multicast branches

siano
Hi,

I have had a look into the source code:

If you look into the StreamCacheConverter an InputSteam will create a StreamCache object by copying the data to a CachedOutputStream and then calling the newStreamCache() method. That will generate a FileInputStreamCache or a ByteArrayStreamCache. Both classes are StreamCache and InputStream at the same time, so if the StreamCache is converted to an InputStream the instance itself is returned.

In the parallel multicast case (and probably also in the splitter), having a StreamCache before the split will result in all branches getting the same InputStream instance (which will lead to a pretty garbled output if two threads are reading from it at the same time).

I see two potential ways how to resolve this issue:
One way would be to change the StreamCache instances to not implement InputStream, but to provide a type converter to InputStream which will generate a new InputSteam for each consumer.

The other way would be copy the StreamCache instance in case of a multicast.

What do you think would be the better solution?

Best regards
Stephan

-----Original Message-----
From: lakshmi.prashant [mailto:[hidden email]]
Sent: Mittwoch, 14. Januar 2015 12:43
To: [hidden email]
Subject: Data Corruption in SFTP in Parallel Multicast branches

Hi,

 • In case if there are SFTP receivers at the end of the branch(es) of
Parallel Multicast, the payload gets corrupted in 1 or more branches.
     o  Any 1 branch / Some SFTP receivers may not receive the full data in
the respective SFTP file(s).

 
While validating it with a simple example (SFTP->parallel multicast->2*SFTP
in 2 different branches): A test with a large payload showed that both
receiver files were created but as soon as one is finished the other one
stops too. This already indicates that there are no two different streams
that are processed but just one stream that is handed to the SFTP endpoints.


Even the trace log showed that if there is cache output stream, the same was
used by the SFTP receivers / endpoints in both the branches:

*Debug Trace:*
2015 01 13
08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel
(Test_Multicast) thread #7 - Multicast##avatarmercury#itjkmerct1#ifl##About
to store file: rec2 using stream:
*org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|*
2015 01 13
08:45:21#+00#DEBUG#org.apache.camel.component.file.remote.SftpOperations##anonymous#Camel
(Test_Multicast) thread #6 - Multicast##avatarmercury#itjkmerct1#ifl##*About
to store file: rec1 using stream:
org.apache.camel.converter.stream.FileInputStreamCache@11ef331a|*


Both use the same stream instance.
MulticastProcessor.createProcessorExchangePairs seems to create the copies.
The copying is based on Exchange.copy and Message.copy. Message.copy does
not copy the streams which is required for the mulicast to work properly
with streams.

We could circumvent the issue, if we convert the payload / body  from cache
outputstream to byte[] before the multicast  or just before the SFTP
endpoints in the multicast branches (i.e. using  ${in.bodyAs(byte[])}
within camel:simple ).

This problem does not come in Sequential multicast.

Can you please let us knoiw if this is an issue 9or) already fixed?

Thanks,
Lakshmi







--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

RE: Data Corruption in SFTP in Parallel Multicast branches

lakshmi.prashant
Hi Stephan,

   The body of the main exchange should be copied to the branch exchanges, as intended (Option 2 suggested by you).
   But I am not sure if it will lead to performance / memory issues, if there are more branches with huge data in the body of the main route.

 Thanks,
 Lakshmi
Reply | Threaded
Open this post in threaded view
|

RE: Data Corruption in SFTP in Parallel Multicast branches

siano
Hi Lakshmi,

Sure, the exchange should be copied (and it actually is copied). The question is whether also to do a deep copy of the StreamCache instance (including the underlying file) or modifying the StreamCache implementations that they do not implement InputStream anymore but return a new InputStream instance for each consumer (which then work on the same underlying data). Maybe there is also some third way possible (like having some partial clone of the Stream cache that does not copy the underlying data, but in that case it might be rather tricky to determine when an underlying file might be deleted.

All this is somewhat intrusive into the core architecture of Camel, so I wonder what the Camel architects think about this.

Best regards
Stephan

-----Original Message-----
From: lakshmi.prashant [mailto:[hidden email]]
Sent: Freitag, 16. Januar 2015 08:47
To: [hidden email]
Subject: RE: Data Corruption in SFTP in Parallel Multicast branches

Hi Stephan,

   The body of the main exchange should be copied to the branch exchanges,
as intended (Option 2 suggested by you).
   But I am not sure if it will lead to performance / memory issues, if
there are more branches with huge data in the body of the main route.

 Thanks,
 Lakshmi




--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5761781.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

Franz Paul Forsthofer
In reply to this post by lakshmi.prashant
Hello Lakshmi and Stephan,

I created the bug https://issues.apache.org/jira/browse/CAMEL-8284

It does contain a solution proposal. Please have a look.

Regards Franz

On Fri, Jan 16, 2015 at 8:46 AM, lakshmi.prashant
<[hidden email]> wrote:

> Hi Stephan,
>
>    The body of the main exchange should be copied to the branch exchanges,
> as intended (Option 2 suggested by you).
>    But I am not sure if it will lead to performance / memory issues, if
> there are more branches with huge data in the body of the main route.
>
>  Thanks,
>  Lakshmi
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5761781.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

lakshmi.prashant
This post was updated on .
In reply to this post by lakshmi.prashant
Hi,

  We have another example of data corruption with Parallel Multicast. This issue is there even in camel-core 2.14.2.

  If we change to serial multicast, the issue disappears.

  I have a route with a camel producer (to uri) that writes data in the exchange & this data is of type stream cache & it sets a huge data in the exchange body.

  Immediately afterward, if I have a parallel multicast and in each multicast branch, if I have some XPath based content filter, I get either of the below exceptions:

  Error               = org.apache.camel.builder.xml.InvalidXPathExpression: Invalid xpath: /queryCompoundEmployeeResponse/CompoundEmployee. Reason: javax.xml.xpath.XPathExpressionException: Failure converting a node of class javax.xml.transform.sax.SAXSource: I/O error reported by XML parser processing null: Invalid byte 2 of 2-byte UTF-8 sequence., cause: javax.xml.xpath.XPathExpressionException: Failure converting a node of class javax.xml.transform.sax.SAXSource: I/O error reported by XML parser processing null: Invalid byte 2 of 2-byte UTF-8 sequence.


(or)


org.apache.camel.CamelExchangeException: Parallel processing failed for number 0. Exchange[Message: [Body is not logged]]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: java.lang.String to the required type: org.w3c.dom.Document with value [Body is not logged] due org.xml.sax.SAXParseException; lineNumber: 1; columnNumber: 1; Content is not allowed in prolog.]]|



I have even tried to set the Exchange.CHARSET_NAME property in the exchange, but that does not help.

My beans.xml looks like the below:


<camel:route>
<camel:from uri="quartz2://test.parallel.multicast.errorTimerEventDefinition11?fireNow=true&amp;trigger.repeatCount=0&amp;trigger.repeatInterval=0"/>

<camel:setProperty propertyName="Exchange.CHARSET_NAME">
        <camel:constant>UTF-8</camel:constant>
</camel:setProperty>

<camel:to  uri="myProducer"/> 
       
       
<camel:multicast id="ParallelGateway_1" parallelProcessing="true" stopOnException="true">
        <camel:to uri="direct://SequenceFlow_5"/>
        <camel:to uri="direct://SequenceFlow_7"/>
</camel:multicast>
</camel:route>

<camel:route>
        <camel:from uri="direct://SequenceFlow_5"/>
       
        <camel:setBody id="CallActivity_4_1429516652727">
                <camel:xpath resultType="org.w3c.dom.NodeList">//CompoundEmployee</camel:xpath>
        </camel:setBody>
        <camel:setBody id="CallActivity_5_1429516652731">
                <camel:simple><CompoundResponse>${in.body}</CompoundResponse></camel:simple>
        </camel:setBody>
        <camel:to id="MessageFlow_2_1429516652733" uri="sap-util:preventPathTraversal"/>
        <camel:to ref="Receiver1_"/>
       
</camel:route>


<camel:route>
        <camel:from uri="direct://SequenceFlow_7"/>
        <camel:setBody id="CallActivity_7_1429516652738">
                <camel:xpath  resultType="org.w3c.dom.NodeList">/queryCompoundEmployeeResponse/CompoundEmployee</camel:xpath>
        </camel:setBody>
        <camel:setBody id="CallActivity_1_1429516652742">
                <camel:simple><CompoundResponse>${in.body}</CompoundResponse></camel:simple>
        </camel:setBody>
        <camel:to id="MessageFlow_1_1429516652744" uri="sap-util:preventPathTraversal"/>
        <camel:to ref="Receiver2_"/>       
       
</camel:route>


It looks like both the branches are still reading from the same stream and that causes some race condition to happen.
The error states that the XML parser for XPath extraction in 1 branch is finding unwanted characters at the start of the data in the body of the (branch) exchange, due to the above.

Debug Trace:

at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
        at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:736)
        at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:83)
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:289)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:812)
Caused by: org.apache.camel.RuntimeCamelException: com.sun.org.apache.xerces.internal.impl.io.MalformedByteSequenceException: Invalid byte 1 of 1-byte UTF-8 sequence.
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1364)
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1006)
        at org.apache.camel.impl.converter.InstanceMethodTypeConverter.convertTo(InstanceMethodTypeConverter.java:78)
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:276)
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:114)
        ... 35 common frames omitted
Caused by: com.sun.org.apache.xerces.internal.impl.io.MalformedByteSequenceException: Invalid byte 1 of 1-byte UTF-8 sequence.
        at com.sun.org.apache.xerces.internal.impl.io.UTF8Reader.invalidByte(UTF8Reader.java:687)
        at com.sun.org.apache.xerces.internal.impl.io.UTF8Reader.read(UTF8Reader.java:557)
        at com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.load(XMLEntityScanner.java:1802)
        at com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.arrangeCapacity(XMLEntityScanner.java:1670)
        at com.sun.org.apache.xerces.internal.impl.XMLEntityScanner.skipString(XMLEntityScanner.java:1708)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanEndElement(XMLDocumentFragmentScannerImpl.java:1748)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2973)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
        at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerImpl.java:117)
        at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:510)
        at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:857)
        at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:777)
        at com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
        at com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:243)
        at com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:347)
        at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:128)
        at org.apache.camel.converter.jaxp.XmlConverter.toDOMDocument(XmlConverter.java:860)


Can you kindly help with any possible solution (or) is this a bug with camel parallel multicasr?

Thanks,
Lakshmi
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

alexey-s
In reply to this post by lakshmi.prashant
Try replacing the Multicast on the Recipient List with parallelProcessing=true.

Multicast:
The Multicast allows to route the same message to a number of endpoints and process them in a different way.

Recipient List:
The recipients will receive a copy of the same Exchange.
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

contactreji
In reply to this post by lakshmi.prashant
Hi

Have u tried replacing
<camel:multicast id="ParallelGateway_1" parallelProcessing="true" stopOnException="true">
        <camel:to uri="direct://SequenceFlow_5"/>
        <camel:to uri="direct://SequenceFlow_7"/>
</camel:multicast>


With

<camel:multicast id="ParallelGateway_1" parallelProcessing="true" stopOnException="true">
        <camel:inOnly uri="seda:SequenceFlow_5"/>
        <camel:inOnly uri="seda:SequenceFlow_7"/>
</camel:multicast>

Reji
Reji Mathews
Sr. Developer - Middleware Integration / SOA ( Open Source - Apache Camel & Jboss Fuse ESB | Mule ESB )
LinkedIn - http://in.linkedin.com/pub/reji-mathews/31/9a2/40a
Twitter - reji_mathews
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

lakshmi.prashant
Hi,

 If direct is changed to SEDA, the multiple branches still fail.

Error processing exchange. Exchange[Message: [Body is instance of org.apache.camel.StreamCache]]. Caused by: [org.quartz.JobExecutionException - org.apache.camel.CamelExchangeException: Parallel processing failed for number 0. Exchange[Message: [Body is instance of org.apache.camel.StreamCache]].

 Caused by: [org.apache.camel.builder.xml.InvalidXPathExpression - Invalid xpath: /queryCompoundEmployeeResponse/CompoundEmployee.
Reason: javax.xml.xpath.XPathExpressionException: Failure converting a node of class javax.xml.transform.sax.SAXSource: I/O error reported by XML parser processing null: Invalid byte 2 of 4-byte UTF-8 sequence.]]|



 If I convert the streamcache to String / byte[] after the stream cache producer & before the multicast, the issue does not arise.


<camel:to  uri="myProducer"/>         
<camel:convertBodyTo type="java.lang.String" charset="UTF-8" /> 
   
<camel:multicast id="ParallelGateway_1" parallelProcessing="true" stopOnException="true">
        <camel:to uri="direct://SequenceFlow_5"/>
        <camel:to uri="direct://SequenceFlow_7"/>
</camel:multicast>
</camel:route>

Thanks,
Lakshmi
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

Franz Paul Forsthofer
Hello Lakshmi,

it could be that this issue is related to
https://issues.apache.org/jira/browse/CAMEL-8688

There we have made a patch in the current snapshot (version 2.16)

Can you try to reproduce your problem on the current snapshot?

Regards Franz

On Tue, May 12, 2015 at 11:39 AM, lakshmi.prashant
<[hidden email]> wrote:

> Hi,
>
>  If direct is changed to SEDA, the multiple branches still fail.
>
> /Error processing exchange. Exchange[Message: [Body is instance of
> org.apache.camel.StreamCache]]. Caused by: [org.quartz.JobExecutionException
> - org.apache.camel.CamelExchangeException: Parallel processing failed for
> number 0. Exchange[Message: [Body is instance of
> org.apache.camel.StreamCache]].
>
>  Caused by: [org.apache.camel.builder.xml.InvalidXPathExpression - Invalid
> xpath: /queryCompoundEmployeeResponse/CompoundEmployee.
> Reason: javax.xml.xpath.XPathExpressionException: Failure converting a node
> of class javax.xml.transform.sax.SAXSource: I/O error reported by XML parser
> processing null: Invalid byte 2 of 4-byte UTF-8 sequence.]]|/
>
>
>  If I convert the streamcache to String / byte[] after the stream cache
> producer & before the multicast, the issue does not arise.
>
>
> <camel:to  uri="myProducer"/>         *
> <camel:convertBodyTo type="java.lang.String" charset="UTF-8" />  *
> <camel:multicast id="ParallelGateway_1" parallelProcessing="true"
> stopOnException="true">
>         <camel:to uri="direct://SequenceFlow_5"/>
>         <camel:to uri="direct://SequenceFlow_7"/>
> </camel:multicast>
> </camel:route>
>
> Thanks,
> Lakshmi
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5767007.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: Data Corruption in SFTP in Parallel Multicast branches

deepaktaker
In reply to this post by Franz Paul Forsthofer
Hi,
    I would like to know if the jira CAMEL-8284 created for the bug "Data Corruption in SFTP in Parallel Multicast branches" released as patch. i was wondering if the patch is made available in camel version 2.14

Regards,

Deepak
Reply | Threaded
Open this post in threaded view
|

RE: Data Corruption in SFTP in Parallel Multicast branches

siano
Hi,

You can look that up yourself in
https://issues.apache.org/jira/browse/CAMEL-8284 

There you will see that it is fixed in Camel 2.15.0 (not in 2.14)

If you need that in 2.14, you might be able to build your own (patched) version of 2.14.x and cherry pick the changes to it (there are four related changes).

Best regards
Stephan

-----Original Message-----
From: deepaktaker [mailto:[hidden email]]
Sent: Donnerstag, 5. November 2015 04:12
To: [hidden email]
Subject: Re: Data Corruption in SFTP in Parallel Multicast branches

Hi,
    I would like to know if the jira CAMEL-8284 created for the bug "Data
Corruption in SFTP in Parallel Multicast branches" released as patch. i was
wondering if the patch is made available in camel version 2.14

Regards,

Deepak



--
View this message in context: http://camel.465427.n5.nabble.com/Data-Corruption-in-SFTP-in-Parallel-Multicast-branches-tp5761673p5773382.html
Sent from the Camel - Users mailing list archive at Nabble.com.