Processor thread pool

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Processor thread pool

Paulo Ramos
I want to use Camel to make a procedure test to an application. The test program must have configurable number of simultaneous threads.

I have find this example:
rom("activemq:queue:SOMETHING").pool(5 [min], 15 [max]).process(new ExpensiveMessageEnricher()).to("activemq:queue:SOMEWHERE_ELSE");

How i can implement this solution using Spring?
There is another way?

Paulo Ramos
Reply | Threaded
Open this post in threaded view
|

Re: Processor thread pool

jstrachan
On 15/10/2007, Paulo Ramos <[hidden email]> wrote:

>
> I want to use Camel to make a procedure test to an application. The test
> program must have configurable number of simultaneous threads.
>
> I have find this example:
> rom("activemq:queue:SOMETHING").pool(5 [min], 15 [max]).process(new
> ExpensiveMessageEnricher()).to("activemq:queue:SOMEWHERE_ELSE");
>
> How i can implement this solution using Spring?
> There is another way?

You can use the <thread> element

<route>
<from uri="activemq:queue:SOMETHING""/>
<thread coreSize="5" maxSize="15"/>
<bean ref="myEnricher"/>
<to uri="activemq:queue:SOMEWHERE_ELSE"/>
</route>

--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Processor thread pool

Paulo Ramos
I am using Camel 1.1-SNAPSHOT.
I have tried your example but i have only one message being processed by the "Processor" at the same time.

camel-context.xml :

...
    <endpoint id="queueA"   uri="activemq:queue:a"/>
    <endpoint id="queueB"   uri="activemq:queue:b"/>

        <route>
                <from uri="queueA" />
                <thread coreSize="5" maxSize="15" daemon="false" keepAliveTime="1" priority="0" stackSize="5"/> 
                <bean ref="processar"/>
               
                <to uri="queueB" ></to>               
        </route>
...
        <bean id="processar" class="org.apache.camel.example.spring.ProcessarPrc" />


ProcessarPrc.java :

public class ProcessarPrc implements Processor{


public void process(Exchange e) {

    Message in = e.getIn();
    Mensagem  mesg = in.getBody(Mensagem.class);
   
    System.out.println(this.getClass().getSimpleName()+"vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"+mesg.getId());

   
    mesg.setProcessado(new Random().nextBoolean());
        long t;
        t = System.currentTimeMillis();
       
        while(System.currentTimeMillis()-t<5000){}
               
        in.setBody(mesg);
    System.out.println(this.getClass().getSimpleName()+" - " + mesg.toString());
    System.out.println(this.getClass().getSimpleName()+"^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"+mesg.getId());
    }
}
Reply | Threaded
Open this post in threaded view
|

Re: Processor thread pool

Paulo Ramos
In reply to this post by Paulo Ramos
Any idea?
I'm stuck with that problem.

Paulo Ramos
Reply | Threaded
Open this post in threaded view
|

Re: Processor thread pool

Hiram Chirino
In reply to this post by Paulo Ramos
Hi Paul, only consumer components that support aync processing will be
able to leverage the async processing that the thread() call does.

Since the whole async processing model is new and optional.. not all
the existing components have been 'upgraded' to take advantage of it.
And the JMS component one of these guys that is still not taking
advantage of the async processing model.

So what happens is that the jms consumer delivers a message to the
processing route, but it does so synchronously.  It will wait for the
exchange to be fully processed, even it's getting processed by a
thread from thread pool.  Since it's waiting, it will not deliver
anymore messages until that thread  from the thread pool is done
processing the exchange.  This is why you only see 1 message getting
processed at a time.

One way to get around this would be to setup routes that look like:
from("activemq:queue:a").to("seda:a");
from("seda:a").thread(5).to("activemq:queue:b");

The seda component does implement an async consumer.  The one down
side to note is that message will be removed from the jms queue once
the message is delivered to the seda:a endpoint.  It does not wait for
the message to get delivered to queue b before acking that it was
received.

Make sense??

On 10/16/07, Paulo Ramos <[hidden email]> wrote:

>
> I am using Camel 1.1-SNAPSHOT.
> I have tried your example but i have only one message being processed by the
> "Processor" at the same time.
>
> camel-context.xml :
>
> ...
>         <endpoint id="queueA"   uri="activemq:queue:a"/>
>         <endpoint id="queueB"   uri="activemq:queue:b"/>
>
>         <route>
>                 <from uri="queueA" />
>                 <thread coreSize="5" maxSize="15" daemon="false" keepAliveTime="1"
> priority="0" stackSize="5"/>
>                 <bean ref="processar"/>
>                 <!-- <process ref="processar" /> -->
>                 <to uri="queueB" ></to>
>         </route>
> ...
>         <bean id="processar" class="org.apache.camel.example.spring.ProcessarPrc"
> />
>
>
> ProcessarPrc.java :
>
> public class ProcessarPrc implements Processor{
>
>
> public void process(Exchange e) {
>
>         Message in = e.getIn();
>         Mensagem  mesg = in.getBody(Mensagem.class);
>
>
> System.out.println(this.getClass().getSimpleName()+"vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"+mesg.getId());
>
>
>         mesg.setProcessado(new Random().nextBoolean());
>         long t;
>         t = System.currentTimeMillis();
>
>         while(System.currentTimeMillis()-t<5000){}
>
>         in.setBody(mesg);
>         System.out.println(this.getClass().getSimpleName()+" - " +
> mesg.toString());
>
> System.out.println(this.getClass().getSimpleName()+"^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"+mesg.getId());
>     }
> }
>
> --
> View this message in context: http://www.nabble.com/Processor-thread-pool-tf4627377s22882.html#a13239009
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>


--
Regards,
Hiram

Blog: http://hiramchirino.com
Reply | Threaded
Open this post in threaded view
|

Re: Processor thread pool

Paulo Ramos
Thanks Hiram.

I think there is a bug in the Spring implementation.

I have tried your solution in JavaDSL and it worked, but in Spring the problem remains the same.
For now i will use JavaDSL but this is not a solution because i want to change some configurations without have to compile the code.

If anyone find a solution, post it please.

Paulo Ramos
Reply | Threaded
Open this post in threaded view
|

Re: Processor thread pool

Guillaume Nodet
Administrator
Note that you can define the routes in java and still use spring to
inject variables in the route builder so that you can actually
customize the routes.

2007/10/19, Paulo Ramos <[hidden email]>:

>
> Thanks Hiram.
>
> I think there is a bug in the Spring implementation.
>
> I have tried your solution in JavaDSL and it worked, but in Spring the
> problem remains the same.
> For now i will use JavaDSL but this is not a solution because i want to
> change some configurations without have to compile the code.
>
> If anyone find a solution, post it please.
>
> Paulo Ramos
> --
> View this message in context:
> http://www.nabble.com/Processor-thread-pool-tf4627377s22882.html#a13290313
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>


--
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/