[jira] Created: (CAMEL-125) Support for a stream-processing resequencer.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[jira] Created: (CAMEL-125) Support for a stream-processing resequencer.

JIRA jira@apache.org
Support for a stream-processing resequencer.
--------------------------------------------

                 Key: CAMEL-125
                 URL: https://issues.apache.org/activemq/browse/CAMEL-125
             Project: Apache Camel
          Issue Type: Improvement
          Components: camel-core, camel-spring
    Affects Versions: 1.1.0
         Environment: Java 6, Windows XP
            Reporter: Martin Krasser
         Attachments: camel-core-patch.txt, camel-spring-patch.txt

Attached is a patch that adds a stream-processing resequencer to Camel. The resequencing algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence in advance (although a capacity parameter prevents the resequencer from running out of memory)

Route builder examples for the stream-processing resequencer:

{{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}

is equivalent to:

{{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}

Custom values for the resequencer's capacity and timeout can be set like in this example:

{{from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(300, 4000L)).to("mock:result")}}

The XML configuration looks like:

{code:xml}
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="direct:start"/>
    <resequencer>
      <simple>in.header.seqnum</simple>
      <to uri="mock:result" />
      <stream-config capacity="300", timeout="4000"/>
    </resequencer>
  </route>
</camelContext>
{code}

The existing batch-processing resequencer can be defined as usual:

{{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}

which is now equivalent to

{{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}

It is now also possible to define a custom configuration for the existing batch-processing resequencer:  

{{from("direct:start").resequencer(header("seqnum")).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")}}

This set the batchSize to 300 and the batchTimeout to 4000 ms.

For the stream-processing resequencer to work, messages must contain a sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

Gap detection is done with strategies that implement the SequenceNumberComparator<E> interface. In addition to the java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> interface defines the predecessor(E, E) and successor(E, E) operations. The stream resequencer can be configured with cutstom SequenceNumberComparator<E> strategies.

The stream-processing resequencer uses the same algorithm as the one in ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time dependencies to ServiceMix I've copied the ServiceMix-independent resequencing engine over to Camel. This redundancy should be removed once Camel and servicemix-eip are going to be combined (are they?). I can contribute to this task, if needed.


--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (CAMEL-125) Support for a stream-processing resequencer.

JIRA jira@apache.org

    [ https://issues.apache.org/activemq/browse/CAMEL-125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40008 ]

James Strachan commented on CAMEL-125:
--------------------------------------

Awesome patch and comments! Am on vacation today but will hopefully get chance to review fully and commit on tueday.

Incidentally yes I'd love to combine camel and servicemix-eip together - contributions most welcome. In ServiceMix 4 it'd be great to combine them into Camel and have ServiceMix natively understand Camel components maybe (FWIW the Camel API is extremely close to the new ServiceMix 4 APIs). Contributions always welcome! Keep up the great work Martin!

> Support for a stream-processing resequencer.
> --------------------------------------------
>
>                 Key: CAMEL-125
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-125
>             Project: Apache Camel
>          Issue Type: Improvement
>          Components: camel-core, camel-spring
>    Affects Versions: 1.1.0
>         Environment: Java 6, Windows XP
>            Reporter: Martin Krasser
>         Attachments: camel-core-patch.txt, camel-spring-patch.txt
>
>
> Attached is a patch that adds a stream-processing resequencer to Camel. The resequencing algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence in advance (although a capacity parameter prevents the resequencer from running out of memory)
> Route builder examples for the stream-processing resequencer:
> {{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}
> is equivalent to:
> {{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}
> Custom values for the resequencer's capacity and timeout can be set like in this example:
> {{from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(300, 4000L)).to("mock:result")}}
> The XML configuration looks like:
> {code:xml}
> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
>   <route>
>     <from uri="direct:start"/>
>     <resequencer>
>       <simple>in.header.seqnum</simple>
>       <to uri="mock:result" />
>       <stream-config capacity="300", timeout="4000"/>
>     </resequencer>
>   </route>
> </camelContext>
> {code}
> The existing batch-processing resequencer can be defined as usual:
> {{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}
> which is now equivalent to
> {{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}
> It is now also possible to define a custom configuration for the existing batch-processing resequencer:  
> {{from("direct:start").resequencer(header("seqnum")).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")}}
> This set the batchSize to 300 and the batchTimeout to 4000 ms.
> For the stream-processing resequencer to work, messages must contain a sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).
> Gap detection is done with strategies that implement the SequenceNumberComparator<E> interface. In addition to the java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> interface defines the predecessor(E, E) and successor(E, E) operations. The stream resequencer can be configured with cutstom SequenceNumberComparator<E> strategies.
> The stream-processing resequencer uses the same algorithm as the one in ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time dependencies to ServiceMix I've copied the ServiceMix-independent resequencing engine over to Camel. This redundancy should be removed once Camel and servicemix-eip are going to be combined (are they?). I can contribute to this task, if needed.

--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply | Threaded
Open this post in threaded view
|

[jira] Resolved: (CAMEL-125) Support for a stream-processing resequencer.

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

     [ https://issues.apache.org/activemq/browse/CAMEL-125?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

James Strachan resolved CAMEL-125.
----------------------------------

       Resolution: Fixed
    Fix Version/s: 1.2.0

Patch applied with huge thanks - great work Martin!

One thought I had was, should we make the ResequenceEngine take a Processor type thing to deliver messages rather than using the Queue and SequenceSender. I guess we could reuse a SedaProcessor or something; I did wonder if we might want to support synchronous dispatch of the messages for easier transaction handling.

e.g. in one JMS session say, consume a bunch of messages and send them on in the right order - in a single thread & JMS transaction.

> Support for a stream-processing resequencer.
> --------------------------------------------
>
>                 Key: CAMEL-125
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-125
>             Project: Apache Camel
>          Issue Type: Improvement
>          Components: camel-core, camel-spring
>    Affects Versions: 1.1.0
>         Environment: Java 6, Windows XP
>            Reporter: Martin Krasser
>             Fix For: 1.2.0
>
>         Attachments: camel-core-patch.txt, camel-spring-patch.txt
>
>
> Attached is a patch that adds a stream-processing resequencer to Camel. The resequencing algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence in advance (although a capacity parameter prevents the resequencer from running out of memory)
> Route builder examples for the stream-processing resequencer:
> {{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}
> is equivalent to:
> {{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}
> Custom values for the resequencer's capacity and timeout can be set like in this example:
> {{from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(300, 4000L)).to("mock:result")}}
> The XML configuration looks like:
> {code:xml}
> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
>   <route>
>     <from uri="direct:start"/>
>     <resequencer>
>       <simple>in.header.seqnum</simple>
>       <to uri="mock:result" />
>       <stream-config capacity="300", timeout="4000"/>
>     </resequencer>
>   </route>
> </camelContext>
> {code}
> The existing batch-processing resequencer can be defined as usual:
> {{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}
> which is now equivalent to
> {{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}
> It is now also possible to define a custom configuration for the existing batch-processing resequencer:  
> {{from("direct:start").resequencer(header("seqnum")).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")}}
> This set the batchSize to 300 and the batchTimeout to 4000 ms.
> For the stream-processing resequencer to work, messages must contain a sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).
> Gap detection is done with strategies that implement the SequenceNumberComparator<E> interface. In addition to the java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> interface defines the predecessor(E, E) and successor(E, E) operations. The stream resequencer can be configured with cutstom SequenceNumberComparator<E> strategies.
> The stream-processing resequencer uses the same algorithm as the one in ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time dependencies to ServiceMix I've copied the ServiceMix-independent resequencing engine over to Camel. This redundancy should be removed once Camel and servicemix-eip are going to be combined (are they?). I can contribute to this task, if needed.

--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (CAMEL-125) Support for a stream-processing resequencer.

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/activemq/browse/CAMEL-125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40026 ]

Martin Krasser commented on CAMEL-125:
--------------------------------------

... Processor instead of Queue and SequenceSender:
Good idea. I'll provide a patch (within the next two weeks, I hope).

... Synchronous dispatch:
Here I can imagine to use a polling consumer as done in org.apache.camel.processor.Resequencer. With a small (configurable) receive-timeout (e.g. of 200 ms) the polling consumer repeatedly receives messages and adds it to resequencer engine. With every receive-timeout the polling thread also attempts to deliver() re-ordered exchanges (to the next processor). In this case we can turn off the deliver() call from the internal timer thread and we remain with a single thread that performs the delivery/dispatch. Does this address your point? If yes, I'll start working on that and provide a patch.

Cheers,
Martin

> Support for a stream-processing resequencer.
> --------------------------------------------
>
>                 Key: CAMEL-125
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-125
>             Project: Apache Camel
>          Issue Type: Improvement
>          Components: camel-core, camel-spring
>    Affects Versions: 1.1.0
>         Environment: Java 6, Windows XP
>            Reporter: Martin Krasser
>             Fix For: 1.2.0
>
>         Attachments: camel-core-patch.txt, camel-spring-patch.txt
>
>
> Attached is a patch that adds a stream-processing resequencer to Camel. The resequencing algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence in advance (although a capacity parameter prevents the resequencer from running out of memory)
> Route builder examples for the stream-processing resequencer:
> {{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}
> is equivalent to:
> {{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}
> Custom values for the resequencer's capacity and timeout can be set like in this example:
> {{from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(300, 4000L)).to("mock:result")}}
> The XML configuration looks like:
> {code:xml}
> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
>   <route>
>     <from uri="direct:start"/>
>     <resequencer>
>       <simple>in.header.seqnum</simple>
>       <to uri="mock:result" />
>       <stream-config capacity="300", timeout="4000"/>
>     </resequencer>
>   </route>
> </camelContext>
> {code}
> The existing batch-processing resequencer can be defined as usual:
> {{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}
> which is now equivalent to
> {{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}
> It is now also possible to define a custom configuration for the existing batch-processing resequencer:  
> {{from("direct:start").resequencer(header("seqnum")).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")}}
> This set the batchSize to 300 and the batchTimeout to 4000 ms.
> For the stream-processing resequencer to work, messages must contain a sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).
> Gap detection is done with strategies that implement the SequenceNumberComparator<E> interface. In addition to the java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> interface defines the predecessor(E, E) and successor(E, E) operations. The stream resequencer can be configured with cutstom SequenceNumberComparator<E> strategies.
> The stream-processing resequencer uses the same algorithm as the one in ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time dependencies to ServiceMix I've copied the ServiceMix-independent resequencing engine over to Camel. This redundancy should be removed once Camel and servicemix-eip are going to be combined (are they?). I can contribute to this task, if needed.

--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply | Threaded
Open this post in threaded view
|

[jira] Commented: (CAMEL-125) Support for a stream-processing resequencer.

JIRA jira@apache.org
In reply to this post by JIRA jira@apache.org

    [ https://issues.apache.org/activemq/browse/CAMEL-125?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40027 ]

Martin Krasser commented on CAMEL-125:
--------------------------------------

... and thanks for applying the patches :)

> Support for a stream-processing resequencer.
> --------------------------------------------
>
>                 Key: CAMEL-125
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-125
>             Project: Apache Camel
>          Issue Type: Improvement
>          Components: camel-core, camel-spring
>    Affects Versions: 1.1.0
>         Environment: Java 6, Windows XP
>            Reporter: Martin Krasser
>             Fix For: 1.2.0
>
>         Attachments: camel-core-patch.txt, camel-spring-patch.txt
>
>
> Attached is a patch that adds a stream-processing resequencer to Camel. The resequencing algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence in advance (although a capacity parameter prevents the resequencer from running out of memory)
> Route builder examples for the stream-processing resequencer:
> {{from("direct:start").resequencer(header("seqnum")).stream().to("mock:result")}}
> is equivalent to:
> {{from("direct:start").resequencer(header("seqnum")).stream(StreamResequencerConfig.getDefault()).to("mock:result")}}
> Custom values for the resequencer's capacity and timeout can be set like in this example:
> {{from("direct:start").resequencer(header("seqnum")).stream(new StreamResequencerConfig(300, 4000L)).to("mock:result")}}
> The XML configuration looks like:
> {code:xml}
> <camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
>   <route>
>     <from uri="direct:start"/>
>     <resequencer>
>       <simple>in.header.seqnum</simple>
>       <to uri="mock:result" />
>       <stream-config capacity="300", timeout="4000"/>
>     </resequencer>
>   </route>
> </camelContext>
> {code}
> The existing batch-processing resequencer can be defined as usual:
> {{from("direct:start").resequencer(header("seqnum")).to("mock:result")}}
> which is now equivalent to
> {{from("direct:start").resequencer(header("seqnum")).batch().to("mock:result")}}
> It is now also possible to define a custom configuration for the existing batch-processing resequencer:  
> {{from("direct:start").resequencer(header("seqnum")).batch(new BatchResequencerConfig(300, 4000L)).to("mock:result")}}
> This set the batchSize to 300 and the batchTimeout to 4000 ms.
> For the stream-processing resequencer to work, messages must contain a sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).
> Gap detection is done with strategies that implement the SequenceNumberComparator<E> interface. In addition to the java.util.Comparator<E>.compare(E, E) operation the SequenceNumberComparator<E> interface defines the predecessor(E, E) and successor(E, E) operations. The stream resequencer can be configured with cutstom SequenceNumberComparator<E> strategies.
> The stream-processing resequencer uses the same algorithm as the one in ServiceMix-3.2-SNAPSHOT (servicemix-eip). In order to avoid compile-time dependencies to ServiceMix I've copied the ServiceMix-independent resequencing engine over to Camel. This redundancy should be removed once Camel and servicemix-eip are going to be combined (are they?). I can contribute to this task, if needed.

--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.