Guaranteed file processing - JMS and file producer

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Guaranteed file processing - JMS and file producer

glenn
This post was updated on .
Hi

I’ve been looking at how to use camel-jms to consume xml from a persistent queue, do some minor transformation/validation of the xml and either output a single file or route a message to an error queue.

The ultimate goal is to ensure that ALL messages are consumed and either a file is created ONCE or an error is routed ONCE to an error queue.

As not wishing to use XA transactions by default, I opted to try out an approach using local JMS transactions, the Camel file component producing the files and use of an idempotentConsumer to guard against the re-processing of redelivered, duplicate messages.

I’d appreciate views on whether it is the case that the approach of using the Camel file producer alongside native JMS transactions and an idempotentConsumer can achieve the desired goal or that to achieve the goal I must opt to go down the XA route (removing need for the idempotentConsumer)?

My preference was a) not to have to add the additional complexity/overhead of global transactions unless absolutely necessary and b) to use the Camel file component as the file producer.

Whilst I can see the original, native JMS transaction based approach working in part, when considering just the sunny day scenario of successful file production only the approach does not guarantee that files will only be produced ONCE. A JVM crash at an inopportune moment between the JmsTransactionManager transaction commit and file repository record insertion can lead to skipping file production (the risk of breaking this guarantee is niche but it does, nonetheless, seem to exist).

My current  thinking is the way to achieve the goal is to use XA transactions alongside an XA compliant file producer and JTA  transaction manager. However, that does seem to preclude the use of the Camel file component (being non fully transactional – “best efforts” based).

For record, I’m using the Camel File component as the file producer with local JMS transactions, an externally defined Spring transaction manager, use of the Camel <transacted/> element within route and use of an idempotentConsumer (with a FileIdempotentRepository). The idempotentConsumer filters out message duplicates based on the JmsMessageId (it is configured with eager=true).
The log output below,  highlights the point where a JVM crash will lead to failure to produce a file but still consume the message.

<code>
05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Created JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=false} java.lang.Object@44eb7aa] from Connection [Shared JMS Connection: ActiveMQConnection {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1,clientId=ID:ANAME-ABCDEFGH-63012-1459843265976-0:1,started=false}]
05 Apr 2016 09:06:27   TransactionContext             DEBUG Begin:TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
05 Apr 2016 09:06:27   ultJmsMessageListenerContainer DEBUG Received message of type [class org.apache.activemq.command.ActiveMQBytesMessage] from consumer [ActiveMQMessageConsumer { value=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1:244, started=true }] of transactional session [Cached JMS Session: ActiveMQSession {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true} java.lang.Object@44eb7aa]
05 Apr 2016 09:06:27   EndpointMessageListener        DEBUG Endpoint[inputQueue://ANAME.INV.RESP.Q] consumer received JMS message: ActiveMQBytesMessage ...
05 Apr 2016 09:06:27   TransactionErrorHandler        DEBUG Transaction begin (0x2e900519) redelivered(false) for (MessageId: ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId: ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Participating in existing transaction
05 Apr 2016 09:06:27   TransactionErrorHandler        TRACE isRunAllowed() -> true (Run allowed if we are not stopped/stopping)
...
05 Apr 2016 09:06:27   routeUnderTest           INFO  ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30: Queue message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
05 Apr 2016 09:06:27   SendProcessor                  DEBUG >>>> Endpoint[AcknowledgementProcessor] Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID: ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
05 Apr 2016 09:06:27   AcknowledgementProcessor       DEBUG Payload is ...
05 Apr 2016 09:06:27   AcknowledgementProcessor       DEBUG Body is ...
05 Apr 2016 09:06:27   routeUnderTest           INFO  queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30: Processed message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
05 Apr 2016 09:06:27   DefaultStreamCachingStrategy   DEBUG Should spool cache 1023 -> false
...
05 Apr 2016 09:06:27   routeUnderTest           INFO  queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30: Validated message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
05 Apr 2016 09:06:27   FileIdempotentRepository       DEBUG Appending ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 to idempotent filestore: C:\Files\filestore\filesOut\.filestore.dat

*** If JVM CRASH occurs here BEFORE JmsTransactionManager commit then FileIdempotentRepository entry will remain BUT no file has been created and no FileIdempotentRepository after processing operations will occur. Upon restoration, broker will redeliver and message will be consumed but FileIdempotentRepository will filter out duplicate message and hence NO file will be produced.  ***

05 Apr 2016 09:06:27   FilterProcessor                DEBUG Filter matches: false for exchange: Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID: ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
05 Apr 2016 09:06:27   SendProcessor                  DEBUG >>>> Endpoint[file://name/someResponses] Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID: ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
05 Apr 2016 09:06:27   FileOperations                 DEBUG Using InputStream to write file: name\someResponses\Some.xml
05 Apr 2016 09:06:27   GenericFileProducer            DEBUG Wrote [name\someResponses\Some.xml] to [Endpoint[file://name/someResponses]]
05 Apr 2016 09:06:27   routeUnderTest           INFO  queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:Some.xml: Completed Processing
05 Apr 2016 09:06:27   TransactionErrorHandler        TRACE Is exchangeId: ID-ANAME-ABCDEFGH-62996-1459843263591-0-22 interrupted? false
...
05 Apr 2016 09:06:27   TransactionErrorHandler        DEBUG Transaction commit (0x2e900519) redelivered(false) for (MessageId: ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId: ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Initiating transaction commit
05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Committing JMS transaction on Session [Cached JMS Session: ActiveMQSession {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true} java.lang.Object@44eb7aa]
05 Apr 2016 09:06:27   ActiveMQSession                DEBUG ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1 Transaction Commit :TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
05 Apr 2016 09:06:27   TransactionContext             DEBUG Commit: TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 syncCount: 2

</code>

Would appreciate views.

Thanks

Glen


Reply | Threaded
Open this post in threaded view
|

Re: Guaranteed file processing - JMS and file producer

Quinn Stevenson
Is there a reason you’re not using a persistent aggregation repository?

If you could share a simplified version of your route, that would help as well.


> On Apr 11, 2016, at 4:21 AM, glenn <[hidden email]> wrote:
>
> Hi
>
> I’ve been looking at how to use camel-jms to consume xml from a persistent
> queue, do some minor transformation/validation of the xml and either output
> a single file or route a message to an error queue.
>
> The ultimate goal is to ensure that ALL messages are consumed and either a
> file is created ONCE or an error is routed ONCE to an error queue.
>
> As not wishing to use XA transactions by default, I opted to try out an
> approach using local JMS transactions, the Camel file component producing
> the files and use of an idempotent repository to guard against the
> re-processing of redelivered, duplicate messages.
>
> I’d appreciate views on whether it is the case that the approach of using
> the Camel file producer alongside native JMS transactions and an
> idempotentRepository cannot achieve the desired goal and that to achieve the
> goal must I opt to go down the XA route?
>
> My preference was a) not to have to add the additional complexity/overhead
> of global transactions unless absolutely necessary and b) to use the Camel
> file component as the file producer.
>
> Whilst I can see the original, native JMS transaction based approach working
> in part, when considering just the sunny day scenario of successful file
> production only the approach does not guarantee that files will only be
> produced ONCE. A JVM crash at an inopportune moment between the
> JmsTransactionManager transaction commit and file repository record
> insertion can lead to skipping file production (the risk of breaking this
> guarantee is niche but it does, nonetheless, seem to exist).
>
> My current  thinking is the way to achieve the goal is to use XA
> transactions alongside an XA compliant file producer and JTA  transaction
> manager. However, that does seem to preclude the use of the Camel file
> component (being non fully transactional – “best efforts” based).
>
> For record, I’m using the Camel File component as the file producer with
> local JMS transactions, an externally defined Spring transaction manager,
> use of the Camel <transacted/> element within route and use of the
> FileIdempotentRepository. The FileIdempotentRepository filters out message
> duplicates based on the JmsMessageId (it is configured with eager=true,
> using false just shifts problem).
> The log output below,  highlights the point where a JVM crash will lead to
> failure to produce a file but still consume the message.
>
> <code>
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Created JMS
> transaction on Session [Cached JMS Session: ActiveMQSession
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=false}
> java.lang.Object@44eb7aa] from Connection [Shared JMS Connection:
> ActiveMQConnection
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1,clientId=ID:ANAME-ABCDEFGH-63012-1459843265976-0:1,started=false}]
> 05 Apr 2016 09:06:27   TransactionContext             DEBUG
> Begin:TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
> 05 Apr 2016 09:06:27   ultJmsMessageListenerContainer DEBUG Received message
> of type [class org.apache.activemq.command.ActiveMQBytesMessage] from
> consumer [ActiveMQMessageConsumer {
> value=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1:244, started=true }] of
> transactional session [Cached JMS Session: ActiveMQSession
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true}
> java.lang.Object@44eb7aa]
> 05 Apr 2016 09:06:27   EndpointMessageListener        DEBUG
> Endpoint[inputQueue://ANAME.INV.RESP.Q] consumer received JMS message:
> ActiveMQBytesMessage ...
> 05 Apr 2016 09:06:27   TransactionErrorHandler        DEBUG Transaction
> begin (0x2e900519) redelivered(false) for (MessageId:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId:
> ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Participating in
> existing transaction
> 05 Apr 2016 09:06:27   TransactionErrorHandler        TRACE isRunAllowed()
> -> true (Run allowed if we are not stopped/stopping)
> ...
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30: Queue message: <?xml
> version="1.0" encoding="UTF-8" standalone="yes"?>
> 05 Apr 2016 09:06:27   SendProcessor                  DEBUG >>>>
> Endpoint[AcknowledgementProcessor]
> Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
> 05 Apr 2016 09:06:27   AcknowledgementProcessor       DEBUG Payload is ...
> 05 Apr 2016 09:06:27   AcknowledgementProcessor       DEBUG Body is ...
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:
> Processed message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
> 05 Apr 2016 09:06:27   DefaultStreamCachingStrategy   DEBUG Should spool
> cache 1023 -> false
> ...
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:
> Validated message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
> 05 Apr 2016 09:06:27   FileIdempotentRepository       DEBUG Appending
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 to idempotent filestore:
> C:\Files\filestore\filesOut\.filestore.dat
>
> *** If JVM CRASH occurs here BEFORE JmsTransactionManager commit then
> FileIdempotentRepository entry will remain BUT no file has been created and
> no FileIdempotentRepository after processing operations will occur. Upon
> restoration, broker will redeliver and message will be consumed but
> FileIdempotentRepository will filter out duplicate message and hence NO file
> will be produced.  ***
>
> 05 Apr 2016 09:06:27   FilterProcessor                DEBUG Filter matches:
> false for exchange:
> Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
> 05 Apr 2016 09:06:27   SendProcessor                  DEBUG >>>>
> Endpoint[file://name/someResponses]
> Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
> 05 Apr 2016 09:06:27   FileOperations                 DEBUG Using
> InputStream to write file: name\someResponses\BuyDeal.xml
> 05 Apr 2016 09:06:27   GenericFileProducer            DEBUG Wrote
> [name\someResponses\BuyDeal.xml] to [Endpoint[file://name/someResponses]]
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:BuyDeal.xml:
> Completed Processing
> 05 Apr 2016 09:06:27   TransactionErrorHandler        TRACE Is exchangeId:
> ID-ANAME-ABCDEFGH-62996-1459843263591-0-22 interrupted? false
> ...
> 05 Apr 2016 09:06:27   TransactionErrorHandler        DEBUG Transaction
> commit (0x2e900519) redelivered(false) for (MessageId:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId:
> ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Initiating
> transaction commit
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Committing JMS
> transaction on Session [Cached JMS Session: ActiveMQSession
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true}
> java.lang.Object@44eb7aa]
> 05 Apr 2016 09:06:27   ActiveMQSession                DEBUG
> ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1 Transaction Commit
> :TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
> 05 Apr 2016 09:06:27   TransactionContext             DEBUG Commit:
> TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 syncCount: 2
>
> </code>
>
> Would appreciate views.
>
> Thanks
>
> Glen
>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Guaranteed-file-processing-JMS-and-file-producer-tp5780899.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Guaranteed file processing - JMS and file producer

valdar
In reply to this post by glenn
Hi Glen,

do you have a way to make the writing of the file itself idempotent?
i.e. you might use a unique id in the file name tush writing it a second
time would alter only the timestamp.

Is that doable? In this way you can put aside the idempotent repository and
overcome your issue.

Regards,
Andrea.

On Mon, Apr 11, 2016 at 11:21 AM, glenn <[hidden email]> wrote:

> Hi
>
> I’ve been looking at how to use camel-jms to consume xml from a persistent
> queue, do some minor transformation/validation of the xml and either output
> a single file or route a message to an error queue.
>
> The ultimate goal is to ensure that ALL messages are consumed and either a
> file is created ONCE or an error is routed ONCE to an error queue.
>
> As not wishing to use XA transactions by default, I opted to try out an
> approach using local JMS transactions, the Camel file component producing
> the files and use of an idempotent repository to guard against the
> re-processing of redelivered, duplicate messages.
>
> I’d appreciate views on whether it is the case that the approach of using
> the Camel file producer alongside native JMS transactions and an
> idempotentRepository cannot achieve the desired goal and that to achieve
> the
> goal must I opt to go down the XA route?
>
> My preference was a) not to have to add the additional complexity/overhead
> of global transactions unless absolutely necessary and b) to use the Camel
> file component as the file producer.
>
> Whilst I can see the original, native JMS transaction based approach
> working
> in part, when considering just the sunny day scenario of successful file
> production only the approach does not guarantee that files will only be
> produced ONCE. A JVM crash at an inopportune moment between the
> JmsTransactionManager transaction commit and file repository record
> insertion can lead to skipping file production (the risk of breaking this
> guarantee is niche but it does, nonetheless, seem to exist).
>
> My current  thinking is the way to achieve the goal is to use XA
> transactions alongside an XA compliant file producer and JTA  transaction
> manager. However, that does seem to preclude the use of the Camel file
> component (being non fully transactional – “best efforts” based).
>
> For record, I’m using the Camel File component as the file producer with
> local JMS transactions, an externally defined Spring transaction manager,
> use of the Camel <transacted/> element within route and use of the
> FileIdempotentRepository. The FileIdempotentRepository filters out message
> duplicates based on the JmsMessageId (it is configured with eager=true,
> using false just shifts problem).
> The log output below,  highlights the point where a JVM crash will lead to
> failure to produce a file but still consume the message.
>
> <code>
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Created JMS
> transaction on Session [Cached JMS Session: ActiveMQSession
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=false}
> java.lang.Object@44eb7aa] from Connection [Shared JMS Connection:
> ActiveMQConnection
>
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1,clientId=ID:ANAME-ABCDEFGH-63012-1459843265976-0:1,started=false}]
> 05 Apr 2016 09:06:27   TransactionContext             DEBUG
> Begin:TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
> 05 Apr 2016 09:06:27   ultJmsMessageListenerContainer DEBUG Received
> message
> of type [class org.apache.activemq.command.ActiveMQBytesMessage] from
> consumer [ActiveMQMessageConsumer {
> value=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1:244, started=true }] of
> transactional session [Cached JMS Session: ActiveMQSession
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true}
> java.lang.Object@44eb7aa]
> 05 Apr 2016 09:06:27   EndpointMessageListener        DEBUG
> Endpoint[inputQueue://ANAME.INV.RESP.Q] consumer received JMS message:
> ActiveMQBytesMessage ...
> 05 Apr 2016 09:06:27   TransactionErrorHandler        DEBUG Transaction
> begin (0x2e900519) redelivered(false) for (MessageId:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId:
> ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Participating
> in
> existing transaction
> 05 Apr 2016 09:06:27   TransactionErrorHandler        TRACE isRunAllowed()
> -> true (Run allowed if we are not stopped/stopping)
> ...
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30: Queue message: <?xml
> version="1.0" encoding="UTF-8" standalone="yes"?>
> 05 Apr 2016 09:06:27   SendProcessor                  DEBUG >>>>
> Endpoint[AcknowledgementProcessor]
>
> Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
> 05 Apr 2016 09:06:27   AcknowledgementProcessor       DEBUG Payload is ...
> 05 Apr 2016 09:06:27   AcknowledgementProcessor       DEBUG Body is ...
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:
> Processed message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
> 05 Apr 2016 09:06:27   DefaultStreamCachingStrategy   DEBUG Should spool
> cache 1023 -> false
> ...
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
> queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:
> Validated message: <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
> 05 Apr 2016 09:06:27   FileIdempotentRepository       DEBUG Appending
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 to idempotent filestore:
> C:\Files\filestore\filesOut\.filestore.dat
>
> *** If JVM CRASH occurs here BEFORE JmsTransactionManager commit then
> FileIdempotentRepository entry will remain BUT no file has been created and
> no FileIdempotentRepository after processing operations will occur. Upon
> restoration, broker will redeliver and message will be consumed but
> FileIdempotentRepository will filter out duplicate message and hence NO
> file
> will be produced.  ***
>
> 05 Apr 2016 09:06:27   FilterProcessor                DEBUG Filter matches:
> false for exchange:
>
> Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
> 05 Apr 2016 09:06:27   SendProcessor                  DEBUG >>>>
> Endpoint[file://name/someResponses]
>
> Exchange[ID-ANAME-ABCDEFGH-62996-1459843263591-0-22][JmsMessage[JmsMessageID:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30]]
> 05 Apr 2016 09:06:27   FileOperations                 DEBUG Using
> InputStream to write file: name\someResponses\BuyDeal.xml
> 05 Apr 2016 09:06:27   GenericFileProducer            DEBUG Wrote
> [name\someResponses\BuyDeal.xml] to [Endpoint[file://name/someResponses]]
> 05 Apr 2016 09:06:27   routeUnderTest           INFO
>
> queue_ANAME.INV.RESP.Q_ID_ANAME-ABCDEFGH-64219-1459497606992-9_1_1_1_30:BuyDeal.xml:
> Completed Processing
> 05 Apr 2016 09:06:27   TransactionErrorHandler        TRACE Is exchangeId:
> ID-ANAME-ABCDEFGH-62996-1459843263591-0-22 interrupted? false
> ...
> 05 Apr 2016 09:06:27   TransactionErrorHandler        DEBUG Transaction
> commit (0x2e900519) redelivered(false) for (MessageId:
> ID:ANAME-ABCDEFGH-64219-1459497606992-9:1:1:1:30 on ExchangeId:
> ID-ANAME-ABCDEFGH-62996-1459843263591-0-22))
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Initiating
> transaction commit
> 05 Apr 2016 09:06:27   JmsTransactionManager          DEBUG Committing JMS
> transaction on Session [Cached JMS Session: ActiveMQSession
> {id=ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1,started=true}
> java.lang.Object@44eb7aa]
> 05 Apr 2016 09:06:27   ActiveMQSession                DEBUG
> ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:1 Transaction Commit
> :TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4
> 05 Apr 2016 09:06:27   TransactionContext             DEBUG Commit:
> TX:ID:ANAME-ABCDEFGH-63012-1459843265976-1:1:4 syncCount: 2
>
> </code>
>
> Would appreciate views.
>
> Thanks
>
> Glen
>
>
>
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Guaranteed-file-processing-JMS-and-file-producer-tp5780899.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



--
"In a world without walls and fences who needs Windows and Gates?"
Reply | Threaded
Open this post in threaded view
|

Re: Guaranteed file processing - JMS and file producer

glenn
This post was updated on .
In reply to this post by glenn
Hi

I'm supplying a simplified route to assist. I didn't consider using the persistent aggregation strategy here in that a single message is being transformed into a single file - I hope that's clearer now with the example route. The FileIdempotentRepository provides sufficient persistence needed per my understanding.

The file component uri being used is actually to be determined but once the files are placed in output folders there will be an external process to move them.
Without the idempotent repository and assuming the file name itself could be idempotent I did consider allowing for the existing file to be updated but I opted against this on basis that I cannot control when the subsequent write/update will happen - it's possible the original file has already been moved before the update.
So have opted to prefer never to write the file twice (effectively removing risk of duplicate response to an external system) and trade this off against current risk that may not write any response.
In either scenario, as is, there would have to be mitigation to address and it seems easier to detect and handle the latter, missed response scenario.

I totally appreciate the probability of this is remote but I'm conscious that I may be missing a simpler solution that would guarantee one file output for every one message consumed that is based on use of the Camel file component with local JMS transactions. That would avoiding any need to implement mitigation measures or need to switch to use an XA file producer that supports global transactions.

<route id="someRoute">
        ** Read from the input queue **
        <from uri="jms:ANAME.INV.RESP.Q" />

        ** Enable use of local native JMS transaction using external Spring transaction manager **
        <transacted />

        **Perform some general validation and processing of message **
        ...
        ** Completed performing some general validation and processing of message **

        ** Given use of local transactions and a JVM crash then is possible for JMS broker to re-deliver duplicate of message that's already been successfully output an identically named a file with same name. Guarding against duplicate file creation via an idempotent consumer. **

        <idempotentConsumer messageIdRepositoryRef="consumerIdempotentRepository" eager="true" completionEager="false" skipDuplicate="false">
                ** uses the messageId header as key for identifying duplicate messages **
                <header>JmsMessageID</header>
                <filter>
                        <exchangeProperty>CamelDuplicateMessage</exchangeProperty>
                        <to uri="direct:duplicateMessageDetected" />
                        <stop />
                </filter>
                ** if not a duplicate send it to file endpoint/upload folder **
                <to uri="file://name/someResponses?fileName=${date:now:yyyyMMddHHmmssSSS}${file:name}" />
        </idempotentConsumer>
</route>

** Do detection specific stuff - just logging **
<route id="duplicateMessageDetectedRoute">
        <from uri="direct:duplicateMessageDetected" />
        <log message="${id}:${file:name}: Duplicate message detected, ignoring message \n" loggingLevel="WARN" />
</route>

<bean id="consumerIdempotentRepository" class="org.apache.camel.processor.idempotent.FileIdempotentRepository">
        ** Plus config for FileIdempotentRepository **
</bean>               


Regards