Understanding Pooled Connection Factory on ActiveMQ

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding Pooled Connection Factory on ActiveMQ

Michele
Hi everyone,

I use ActiveMQ Component for my use case that foresee:

1. Read file from folder wich main cotain large number of line
2. Split with stream, Process each line and then store it in Active MQ
3. Consumers recieves messages from queue and then to invoke a Rest Service.

I defined a Pooled Connection Factory on ActiveMQ like this

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
        <property name="userName" value="admin" />
        <property name="password" value="admin" />
</bean>

<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8" />
        <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<bean id="jmsConfigNoTx" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory" />
        <property name="concurrentConsumers" value="10" />
        <property name="transferExchange" value="true" />
</bean>


<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfigNoTx" />
</bean>

and Route like this

<route id="FileReader_Route">
    <from uri="file:incoming?....." />
     <split streaming="true" parallelProcessing="true">
           <tokenize token="\n" />
           <unmarshal ref="IncomingCSVFileDataFormat" />
           <process ref="DataProcessor" />
           <marshal ref="Gson" />
           <to uri="activemq:queue:incomingTickets" />
     </split>
</route>

<route id="ProcessTicket_Route">
     <from uri="activemq:queue:incomingTickets?destination.consumer.prefetchSize=0" />
     <throttle timePeriodMillis="10000" asyncDelayed="true">
          <constant>5</constant>           
           <to uri="jetty:http://host/rs/v1.0/ticket?jettyHttpBindingRef=CustomJettyHttpBinding" />
     </throttle>     
</route>

To resolve OutOfMemory problem on Active MQ I increased heap and I changed activemq.xml like this:

<policyEntry queue=">" producerFlowControl="true" memoryLimit="250mb" optimizedDispatch="true" />
                                         
During test, I observed that only one Consumer works on the broker to dequeue like image attached pooled-connection.png. Why this?

But, Does Pooled Connection Factory work only onproducer side (Push message in queue)?
How I obtain a cuncurrentConsumers on consumer side (Pop message from queue)?


Environment: JBoss Fuse 6.2 based on Apache Camel and ActiveMQ.

Thanks in advance

Best Regards

Michele






Reply | Threaded
Open this post in threaded view
|

Re: Understanding Pooled Connection Factory on ActiveMQ

Quinn Stevenson
Your consuming route only defines one consumer (i.e. concurrentConsumers is not set, so it defaults to 1).  Try increasing that.



> On Apr 7, 2016, at 7:10 AM, Michele <[hidden email]> wrote:
>
> Hi everyone,
>
> I use ActiveMQ Component for my use case that foresee:
>
> 1. Read file from folder wich main cotain large number of line
> 2. Split with stream, Process each line and then store it in Active MQ
> 3. Consumers recieves messages from queue and then to invoke a Rest Service.
>
> I defined a Pooled Connection Factory on ActiveMQ like this
>
> <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory">
> <property name="brokerURL" value="tcp://localhost:61616" />
> <property name="userName" value="admin" />
> <property name="password" value="admin" />
> </bean>
>
> <bean id="pooledConnectionFactory"
> class="org.apache.activemq.pool.PooledConnectionFactory"
> init-method="start" destroy-method="stop">
> <property name="maxConnections" value="8" />
> <property name="connectionFactory" ref="jmsConnectionFactory" />
> </bean>
>
> <bean id="jmsConfigNoTx"
> class="org.apache.camel.component.jms.JmsConfiguration">
> <property name="connectionFactory" ref="pooledConnectionFactory" />
> <property name="concurrentConsumers" value="10" />
> <property name="transferExchange" value="true" />
> </bean>
>
>
> <bean id="activemq"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
> <property name="configuration" ref="jmsConfigNoTx" />
> </bean>
>
> and Route like this
>
> <route id="FileReader_Route">
>    <from uri="file:incoming?....." />
>     <split streaming="true" parallelProcessing="true">
>           <tokenize token="\n" />
>           <unmarshal ref="IncomingCSVFileDataFormat" />
>           <process ref="DataProcessor" />
>           <marshal ref="Gson" />
>           <to uri="activemq:queue:incomingTickets" />
>     </split>
> </route>
>
> <route id="ProcessTicket_Route">
>     <from
> uri="activemq:queue:incomingTickets?destination.consumer.prefetchSize=0" />
>     <throttle timePeriodMillis="10000" asyncDelayed="true">
>          <constant>5</constant>          
>           <to
> uri="jetty:http://host/rs/v1.0/ticket?jettyHttpBindingRef=CustomJettyHttpBinding"
> />
>     </throttle>    
> </route>
>
> To resolve OutOfMemory problem on Active MQ I increased heap and I changed
> activemq.xml like this:
>
> <policyEntry queue=">" producerFlowControl="true" memoryLimit="250mb"
> optimizedDispatch="true" />
>
> During test, I observed that only one Consumer works on the broker to
> dequeue like image attached  pooled-connection.png
> <http://camel.465427.n5.nabble.com/file/n5780689/pooled-connection.png>  .
> Why this?
>
> But, Does Pooled Connection Factory work only onproducer side (Push message
> in queue)?
> How I obtain a cuncurrentConsumers on consumer side (Pop message from
> queue)?
>
>
> Environment: JBoss Fuse 6.2 based on Apache Camel and ActiveMQ.
>
> Thanks in advance
>
> Best Regards
>
> Michele
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Understanding-Pooled-Connection-Factory-on-ActiveMQ-tp5780689.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Understanding Pooled Connection Factory on ActiveMQ

Michele
 Hi Stevenson,

I configured component activemq using Pooled Connection Factory.
So, PooledConnection Factory regarded only Producer?

I try to add a concurrentConsumers as option...

Thank you so much

Regards

Michele
Reply | Threaded
Open this post in threaded view
|

Re: Understanding Pooled Connection Factory on ActiveMQ

Quinn Stevenson
The pooled connection factory is a good idea, and it can be used by both the producer and consumer - but the connection factory doesn’t determine the number of concurrent consumers.

> On Apr 7, 2016, at 10:04 AM, Michele <[hidden email]> wrote:
>
> Hi Stevenson,
>
> I configured component activemq using Pooled Connection Factory.
> So, PooledConnection Factory regarded only Producer?
>
> I try to add a concurrentConsumers as option...
>
> Thank you so much
>
> Regards
>
> Michele
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Understanding-Pooled-Connection-Factory-on-ActiveMQ-tp5780689p5780704.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Understanding Pooled Connection Factory on ActiveMQ

Michele
Hi,

sorry, but according to http://camel.apache.org/jms.html section Concurrent Consuming

You can configure this option in one of the following ways:
- On the JmsComponent (I believe during configuration PooledConnectionFactory),
- On the endpoint URI or,
- By invoking setConcurrentConsumers() directly on the JmsEndpoint.

However, I tried to add a concurrentConsumers option in endpoint URI and always only one consumes the queue.

The last test of 10 consumer only 1 worked with Dequeue counter ~ 35000 line.

I don't understand this behaviour.

Reply | Threaded
Open this post in threaded view
|

Re: Understanding Pooled Connection Factory on ActiveMQ

Quinn Stevenson
Sorry Michele - I missed the configuration in the jmsConfig bean.  Also, I should've looked at the screenshot you posted - it shows the 10 consumers, with only one consuming.  

I ran you're JMS config with a simple consuming route, but I'm seeing all of the consumers working - I don't get any inactive consumers.  I'm using the ActiveMQ Management console to look at this.

Here's the route I ran and I'll post a screenshot of my ActiveMQ console

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:camel="http://camel.apache.org/schema/blueprint"
           xsi:schemaLocation="
         http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
         http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">

    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
        <property name="userName" value="admin" />
        <property name="password" value="admin" />
    </bean>

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
          init-method="start" destroy-method="stop">
        <property name="maxConnections" value="8"/>
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
    </bean>

    <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="pooledConnectionFactory"/>
        <property name="concurrentConsumers" value="10"/>
    </bean>

    <bean id="my-amq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

    <camel:camelContext id="michele-context" xmlns="http://camel.apache.org/schema/blueprint">
        <route id="ProcessTicket_Route">
            <from uri="my-amq://queue:incomingTickets?destination.consumer.prefetchSize=0"/>
            <throttle timePeriodMillis="1000" asyncDelayed="true">
                <constant>1</constant>
                <to uri="mock://result" />
            </throttle>
        </route>
    </camel:camelContext>


</blueprint>


Reply | Threaded
Open this post in threaded view
|

Re: Understanding Pooled Connection Factory on ActiveMQ

Quinn Stevenson
 Here's the screenshot - missed it the first time