Help with Splitter/Aggregator-like behavior

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with Splitter/Aggregator-like behavior

Ron Cecchini

So, I have a situation where I need something like a Splitter and an Aggregator.
But as far as I can tell from reading and googling, maybe my situation is nonstandard?
From what I can tell, a Splitter and Aggregator are used together within a single route.
In my case, I need the Splitter and Aggregator separated into a sender and receiver, resp.


I'm just looking for someone to tell me if the following fits squarely within the Splitter
and Aggregator patterns - if so, I'll dig in and figure it out - or if there's another pattern
or something else to try.


Thank you in advance for your guidance, and sorry for being so verbose again (just trying to be clear).


-----


On the Splitter side, per usual, I need to split a big message into individual messages.
However, I can't just split and let each individual message continue on the route.
Instead, I need to "wrap" each individual message and stick some header information on it


The situation is very much like the following, which is very UDP-like:


Big messages come in, and they get split into "packages" of a preset size.
All the individual "packages" can be said to belong to a "frame" of data.
The header of the individual messages contain the Frame # and Package # and the Total #
of packages in the frame so the receiver knows when it has received a full frame of data.

Message: 1
     Frame: 1 - Package: 1 - Total: 3
     Frame: 1 - Package: 2 - Total: 3
     Frame: 1 - Package: 3 - Total: 3
Message: 2
     Frame: 2 - Package: 1 - Total: 2
     Frame: 2 - Package: 2 - Total: 2


Etc.


If I can't accomplish this with a split() of some kind, how could I do it with a regular Processor?
Having a Processor manually split and bundle the data into "packages" is trivial.
But how does the Processor then write the individual messages back to a "direct:processPackage" route point?
Can a Processor invoke (write data to) a route, at some point in the middle of that route?


-----


The Aggregator, as you would expect, needs to do the opposite of the above:


It needs to aggregate "packages" of data until it determines it has a full "frame".
Then it bundles all the package payloads into a single, big message.
When a frame is not full, data does not flow to the rest of the route.
When the frame is full, the data is written to some route mid-point; e.g. "direct:translateMessage".


So, can this sort of "asynchronous" aggregating be done?
Can an aggregating Processor basically maintain state, and decide to write or not write to a route?


Thank you again for any pointers.
Reply | Threaded
Open this post in threaded view
|

Re: Help with Splitter/Aggregator-like behavior

Claus Ibsen-2
Hi

You can use the splitter and aggregator to do something like that.

The aggregator can group the splitted messages together based on that
frame, and it can do this in out of order.
And the output of the aggregator is routed separated from the input (async)

Maybe try to build a simple use-case / sample / unit test with just
the splitter and aggregator and see if you can build something.
And if not then maybe post a bit here again with what you have done,
so we can better help / understand your use-case.


On Thu, Sep 20, 2018 at 7:15 AM Ron Cecchini <[hidden email]> wrote:

>
>
> So, I have a situation where I need something like a Splitter and an Aggregator.
> But as far as I can tell from reading and googling, maybe my situation is nonstandard?
> From what I can tell, a Splitter and Aggregator are used together within a single route.
> In my case, I need the Splitter and Aggregator separated into a sender and receiver, resp.
>
>
> I'm just looking for someone to tell me if the following fits squarely within the Splitter
> and Aggregator patterns - if so, I'll dig in and figure it out - or if there's another pattern
> or something else to try.
>
>
> Thank you in advance for your guidance, and sorry for being so verbose again (just trying to be clear).
>
>
> -----
>
>
> On the Splitter side, per usual, I need to split a big message into individual messages.
> However, I can't just split and let each individual message continue on the route.
> Instead, I need to "wrap" each individual message and stick some header information on it
>
>
> The situation is very much like the following, which is very UDP-like:
>
>
> Big messages come in, and they get split into "packages" of a preset size.
> All the individual "packages" can be said to belong to a "frame" of data.
> The header of the individual messages contain the Frame # and Package # and the Total #
> of packages in the frame so the receiver knows when it has received a full frame of data.
>
> Message: 1
>      Frame: 1 - Package: 1 - Total: 3
>      Frame: 1 - Package: 2 - Total: 3
>      Frame: 1 - Package: 3 - Total: 3
> Message: 2
>      Frame: 2 - Package: 1 - Total: 2
>      Frame: 2 - Package: 2 - Total: 2
>
>
> Etc.
>
>
> If I can't accomplish this with a split() of some kind, how could I do it with a regular Processor?
> Having a Processor manually split and bundle the data into "packages" is trivial.
> But how does the Processor then write the individual messages back to a "direct:processPackage" route point?
> Can a Processor invoke (write data to) a route, at some point in the middle of that route?
>
>
> -----
>
>
> The Aggregator, as you would expect, needs to do the opposite of the above:
>
>
> It needs to aggregate "packages" of data until it determines it has a full "frame".
> Then it bundles all the package payloads into a single, big message.
> When a frame is not full, data does not flow to the rest of the route.
> When the frame is full, the data is written to some route mid-point; e.g. "direct:translateMessage".
>
>
> So, can this sort of "asynchronous" aggregating be done?
> Can an aggregating Processor basically maintain state, and decide to write or not write to a route?
>
>
> Thank you again for any pointers.



--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
Reply | Threaded
Open this post in threaded view
|

Re: Help with Splitter/Aggregator-like behavior

Ron Cecchini
> On September 20, 2018 at 7:30 AM Claus Ibsen <[hidden email]> wrote:
>
>
> Hi
>
> You can use the splitter and aggregator to do something like that.
>
> The aggregator can group the splitted messages together based on that
> frame, and it can do this in out of order.
> And the output of the aggregator is routed separated from the input (async)
>
> Maybe try to build a simple use-case / sample / unit test with just
> the splitter and aggregator and see if you can build something.
> And if not then maybe post a bit here again with what you have done,
> so we can better help / understand your use-case.


Hi, Claus.

Thank you for your response.

As it turned out, I didn't need a Splitter or Aggregator at all to implement my "distributed splitter/aggregator".

All I needed was a ProducerTemplate and some simple Processor classes.

It really ended up being a thing of beauty...

I'll include the sanitized code below for anyone who is interested.

[ First I list the "before" SENDER / RECEIVER, which doesn't have data chunking,
  and then the "after" SENDER / RECEIVER which includes a "DataChunker" to do the Splitting
  and a "DataChunkProcessor" to do the Aggregating. ]

Thanks again.

Ron

p.s. And if anyone wants to help me figure out how to get @Autowired to work for ProducerTemplate, that would be great.
     Otherwise, creating and holding onto the first ProducerTemplate I need is working just fine.
     From googling, the best I can determine is that somehow my Processor @Component classes are being processed
     before the CamelContext is fully up (and so there's no default ProducerTemplate to inject).  Yes...?

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

BEFORE, without data chunking (splitting/aggregating).

SENDER:

@Component
public class Route extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        restConfiguration()
                .host("localhost").port(8080)
                .bindingMode(RestBindingMode.json);

        getContext().setStreamCaching(true);

        from("timer:autos?period={{timer.period}}")
                .streamCaching()
                .to("rest:get:auto/list")
                .to("direct:udp");

        from("direct:udp")
                .convertBodyTo(String.class, “UTF-8”)
                .to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false")
    }
}

--------------------------------------------------------------------------------

RECEIVER:

@Component
public class AdapterRouteBuilder extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        JacksonDataFormat autoListJson = new ListJacksonDataFormat(Auto.class);

        getContext().setStreamCaching(true);

        from("netty4:udp://localhost:{{receiver.port}}?sync=false")
                .to("direct:autolist");

        from("direct:autolist")
                .unmarshal(autoListJson)
                .process(new AutoListProcessor())  // creates a protobuf, sets it in the Exchange
                .to("direct:protobuf");

        from("direct:protobuf")
                .marshal().protobuf()
                .to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true");
    }

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

AFTER, with data chunking (splitting/aggregating).

SENDER:

@Component
public class Route extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        JacksonDataFormat autoListJson  = new ListJacksonDataFormat(Auto.class);
        JacksonDataFormat dataChunkJson = new JacksonDataFormat(DataChunk.class);

        getContext().setStreamCaching(true);

        restConfiguration()
                .host("localhost").port(8080)
                .bindingMode(RestBindingMode.json);

        getContext().setStreamCaching(true);

        from("timer:autos?period={{timer.period}}")
                .to("rest:get:auto/list")
                .to("direct:datachunker");

        from("direct:datachunker")
                .unmarshal(autoListJson)
                .process(new DataChunker());  // creates DataChunk; sends directly to "direct:udp"

        from("direct:udp")
                .marshal(dataChunkJson)
                .convertBodyTo(String.class, "UTF-8")
                .to("netty4:udp://localhost:{{receiver.port}}?udpConnectionlessSending=true&sync=false")
    }
}

--------------------------------------------------------------------------------

RECEIVER:

@Component
public class AdapterRouteBuilder extends RouteBuilder
{
    @Override
    public void configure () throws Exception
    {
        JacksonDataFormat dataChunkJson = new JacksonDataFormat(DataChunk.class);

        getContext().setStreamCaching(true);

        from("netty4:udp://localhost:{{receiver.port}}?sync=false")
                .to("direct:datachunk");

        from("direct:datachunk")
                .unmarshal(dataChunkJson)
                .process(new DataChunkProcessor());  // sends directly to "direct:protobuf"

        from("direct:protobuf")
                .marshal().protobuf()
                .to("rabbitmq:{{rabbitmq.exch}}?connectionFactory=#rabbitmq&routingKey={{rabbitmq.rkey}}&exchangeType=topic&durable=false&autoDelete=true");
    }

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------

public class DataChunker implements Processor
{
//    @Autowired
//    private ProducerTemplate producerTemplate;  // XXX: can't get this to work

    private ProducerTemplate producerTemplate = null;

    private static int frame = 0;

    public void process (Exchange exchange) throws Exception
    {
        if ( producerTemplate == null ) producerTemplate = exchange.getContext().createProducerTemplate();

        List<Auto> autos = null;
        try {
            autos = exchange.getIn().getBody(List.class);
        } catch (Exception e) {
            System.out.println("*** ERROR: DataChunker: process: " + e.getMessage());
        }

        ++frame;

        int packet = 0;
        for ( Auto auto : autos )
        {
            DataChunk chunk = new DataChunk();
            chunk.setFrame(frame);
            chunk.setPacket(++packet);
            chunk.setTotalPackets(autos.size());
            chunk.getList().add(auto);

            producerTemplate.sendBody("direct:udp", chunk);
        }

        // XXX: That's it.  Don't need to do: exchange.getIn().setBody(...);
    }
}

--------------------------------------------------------------------------------

public class DataChunkProcessor implements Processor
{
    private static class DataFrame
    {
        public int frame = 0;
        public int size  = 0;  // expected # packets, from a DataChunk header

        public boolean written = false;

        public Set<Integer> packets = new HashSet<Integer>();
        public List<Autos>  autos   = new ArrayList<>();

        // XXX: ... stuff left out
    }

//    @Autowired
//    private ProducerTemplate producerTemplate;  // XXX: can't get this to work

    private ProducerTemplate producerTemplate = null;

    private static DataFrame dataFrame = new DataFrame();

    public void process (Exchange exchange) throws Exception
    {
        DataChunk chunk = null;
        try {
            chunk = exchange.getIn().getBody(DataChunk.class);
        } catch (Exception e) {
            System.out.println("*** ERROR: DataChunkProcessor: process: " + e.getMessage());
        }

        // XXX: process DataChunk, store stuff in 'dataFrame', determine if it is time to write, etc.

        if ( it_is_time_to_write ) {
            buildAndWrite(exchange);
        }

        // XXX: don't have to do anything to the Exchange; the route has effectively ended in RouteBuilder
    }

    public void buildAndWrite (Exchange exchange) throws ParseException
    {
        if ( producerTemplate == null ) producerTemplate = exchange.getContext().createProducerTemplate();

        MyProtobuf proto = buildProtobuf();  // XXX: loops on dataFrame.autos and builds custom protobuf object
        producerTemplate.sendBody("direct:protobuf", proto);
        dataFrame.written = true;
    }
}


> On Thu, Sep 20, 2018 at 7:15 AM Ron Cecchini <[hidden email]> wrote:
> >
> >
> > So, I have a situation where I need something like a Splitter and an Aggregator.
> > But as far as I can tell from reading and googling, maybe my situation is nonstandard?
> > From what I can tell, a Splitter and Aggregator are used together within a single route.
> > In my case, I need the Splitter and Aggregator separated into a sender and receiver, resp.
> >
> >
> > I'm just looking for someone to tell me if the following fits squarely within the Splitter
> > and Aggregator patterns - if so, I'll dig in and figure it out - or if there's another pattern
> > or something else to try.
> >
> >
> > Thank you in advance for your guidance, and sorry for being so verbose again (just trying to be clear).
> >
> >
> > -----
> >
> >
> > On the Splitter side, per usual, I need to split a big message into individual messages.
> > However, I can't just split and let each individual message continue on the route.
> > Instead, I need to "wrap" each individual message and stick some header information on it
> >
> >
> > The situation is very much like the following, which is very UDP-like:
> >
> >
> > Big messages come in, and they get split into "packages" of a preset size.
> > All the individual "packages" can be said to belong to a "frame" of data.
> > The header of the individual messages contain the Frame # and Package # and the Total #
> > of packages in the frame so the receiver knows when it has received a full frame of data.
> >
> > Message: 1
> >      Frame: 1 - Package: 1 - Total: 3
> >      Frame: 1 - Package: 2 - Total: 3
> >      Frame: 1 - Package: 3 - Total: 3
> > Message: 2
> >      Frame: 2 - Package: 1 - Total: 2
> >      Frame: 2 - Package: 2 - Total: 2
> >
> >
> > Etc.
> >
> >
> > If I can't accomplish this with a split() of some kind, how could I do it with a regular Processor?
> > Having a Processor manually split and bundle the data into "packages" is trivial.
> > But how does the Processor then write the individual messages back to a "direct:processPackage" route point?
> > Can a Processor invoke (write data to) a route, at some point in the middle of that route?
> >
> >
> > -----
> >
> >
> > The Aggregator, as you would expect, needs to do the opposite of the above:
> >
> >
> > It needs to aggregate "packages" of data until it determines it has a full "frame".
> > Then it bundles all the package payloads into a single, big message.
> > When a frame is not full, data does not flow to the rest of the route.
> > When the frame is full, the data is written to some route mid-point; e.g. "direct:translateMessage".
> >
> >
> > So, can this sort of "asynchronous" aggregating be done?
> > Can an aggregating Processor basically maintain state, and decide to write or not write to a route?
> >
> >
> > Thank you again for any pointers.
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2