Async processing and number of inflight exchanges issue

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Async processing and number of inflight exchanges issue

szhemzhitsky
Hi gurus,

I have the  following issue. I need to send requests to webservice from camel on timer events.
As timer consumer uses synchronous event firing , only one request can be sent at the same time. What I’d like is use to threads DSL to process multiple timer events at the same time.

Here is an example:

from("timer:start?period=100&delay=100&repeatCount=0")
    .threads(1, 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
        .to("bean:wscall");

However, usage of threads DSL with original timer consumer does not make sense because of synchronicity of timer consumer, so I tried to develop asynchronous timer component that uses asynchronous API to fire events.

Everything works fine except that there is a lot of inflight exchanges when stopping the route.

Could you please suggest how to avoid such an issue?

=================================================================================================================================================

Below is the unit test that allows to reproduce this unexpected behavior.


package foo.bar;

import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.junit.Test;

import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class CamelRoutingTest extends CamelTestSupport {

    @Test
    public void route() throws Exception {
        Endpoint endpoint = context().getRoute("timer").getConsumer().getEndpoint();

        // make timer fire for several times
        TimeUnit.SECONDS.sleep(5);

        context().stopRoute("timer", 10, TimeUnit.SECONDS);

        int inflightExchanges = context().getInflightRepository().size(endpoint);
        assertTrue("inflightExchanges: " + inflightExchanges, inflightExchanges == 0);
    }

    @Override
    protected JndiRegistry createRegistry() throws Exception {
        JndiRegistry registry = super.createRegistry();
        registry.bind("atimer", new AsyncTimerComponent());
        return registry;
    }

    @Override
    protected int getShutdownTimeout() {
        return 1;
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("atimer:start?period=100&delay=100&repeatCount=0").id("timer")
                        .threads(1, 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
                                 "START: ${header." + Exchange.BREADCRUMB_ID + "}")
                            .delay(1000)
                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
                                 "COMPLETE: ${header." + Exchange.BREADCRUMB_ID + "}")
                            .to("mock:result");
            }
        };
    }

    public static class AsyncTimerComponent extends DefaultComponent {
        private final Map<String, Timer> timers = new HashMap<String, Timer>();
        public Timer getTimer(AsyncTimerEndpoint endpoint) {
            String key = endpoint.getTimerName();
            if (!endpoint.isDaemon()) {
                key = "nonDaemon:" + key;
            }

            Timer answer;
            synchronized (timers) {
                answer = timers.get(key);
                if (answer == null) {
                    answer = new Timer(endpoint.getTimerName(), endpoint.isDaemon());
                    timers.put(key, answer);
                }
            }
            return answer;
        }

        @Override
        protected AsyncTimerEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
            AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this, remaining);
            setProperties(answer, parameters);
            return answer;
        }

        @Override
        protected void doStop() throws Exception {
            Collection<Timer> collection = timers.values();
            for (Timer timer : collection) {
                timer.cancel();
            }
            timers.clear();
        }
    }

    public static class AsyncTimerEndpoint extends DefaultEndpoint implements Service {
        private String timerName;
        private long period = 1000;
        private long delay;
        private boolean fixedRate;
        private boolean daemon = true;
        private Timer timer;
        private long repeatCount;

        public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent component, String timerName) {
            super(fullURI, component);
            this.timerName = timerName;
        }
        public Producer createProducer() throws Exception {
            throw new RuntimeCamelException("There is no producer");
        }

        public AsyncTimerConsumer createConsumer(Processor processor) throws Exception {
            return new AsyncTimerConsumer(this, processor);
        }

        public void start() throws Exception {
            // noop
        }

        public void stop() throws Exception {
            setTimer(null);
        }

        public String getTimerName() {
            if (timerName == null) {
                timerName = getEndpointUri();
            }
            return timerName;
        }

        public void setTimerName(String timerName) {
            this.timerName = timerName;
        }

        public boolean isDaemon() {
            return daemon;
        }

        public void setDaemon(boolean daemon) {
            this.daemon = daemon;
        }

        public long getDelay() {
            return delay;
        }

        public void setDelay(long delay) {
            this.delay = delay;
        }

        public boolean isFixedRate() {
            return fixedRate;
        }

        public void setFixedRate(boolean fixedRate) {
            this.fixedRate = fixedRate;
        }

        public long getPeriod() {
            return period;
        }

        public void setPeriod(long period) {
            this.period = period;
        }

        public long getRepeatCount() {
            return repeatCount;
        }

        public void setRepeatCount(long repeatCount) {
            this.repeatCount = repeatCount;
        }

        public boolean isSingleton() {
            return true;
        }

        public synchronized Timer getTimer() {
            if (timer == null) {
                AsyncTimerComponent tc = (AsyncTimerComponent) getComponent();
                timer = tc.getTimer(this);
            }
            return timer;
        }

        public synchronized void setTimer(Timer timer) {
            this.timer = timer;
        }

        public String getEndpointUri() {
            return super.getEndpointUri();
        }
    }

    public static class AsyncTimerConsumer extends DefaultConsumer {

        private final AsyncTimerEndpoint endpoint;
        private volatile TimerTask task;

        public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor processor) {
            super(endpoint, processor);
            this.endpoint = endpoint;
        }

        @Override
        protected void doStart() throws Exception {
            task = new TimerTask() {
                // counter
                private final AtomicLong counter = new AtomicLong();

                @Override
                public void run() {
                    try {
                        long count = counter.incrementAndGet();

                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
                        if (fire) {
                            sendTimerExchange(count);
                        } else {
                            // no need to fire anymore as we exceeded repeat count
                            log.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
                            cancel();
                        }
                    } catch (Throwable e) {
                        // catch all to avoid the JVM closing the thread and not firing again
                        log.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
                    }
                }
            };

            Timer timer = endpoint.getTimer();
            configureTask(task, timer);
        }

        @Override
        protected void doStop() throws Exception {
            if (task != null) {
                task.cancel();
            }
            task = null;
        }

        protected void configureTask(TimerTask task, Timer timer) {
            if (endpoint.isFixedRate()) {
                timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
            } else {
                if (endpoint.getPeriod() > 0) {
                    timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
                } else {
                    timer.schedule(task, endpoint.getDelay());
                }
            }
        }

        protected void sendTimerExchange(long counter) {
            Exchange exchange = endpoint.createExchange();
            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
            exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date());

            log.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
            try {
                AsyncProcessorHelper.process(getAsyncProcessor(), exchange, new AsyncCallback() {
                    public void done(boolean doneSync) {
                        // noop
                    }
                });

                // log exception if an exception occurred and was not handled
                if (exchange.getException() != null) {
                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
                }
            } catch (Exception e) {
                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
            }
        }

    }

Best Regards,
Sergey Zhemzhitsky

_______________________________________________________

The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia.
If you need assistance please contact our Contact Center  (+7495) 258 0500 or go to www.troika.ru/eng/Contacts/system.wbp  

Reply | Threaded
Open this post in threaded view
|

Re: Async processing and number of inflight exchanges issue

Claus Ibsen-2
Hi

Well spotted. The ticket is created
https://issues.apache.org/jira/browse/CAMEL-4925

2012/1/20 Zhemzhitsky Sergey <[hidden email]>:

> Hi gurus,
>
> I have the  following issue. I need to send requests to webservice from camel on timer events.
> As timer consumer uses synchronous event firing , only one request can be sent at the same time. What I’d like is use to threads DSL to process multiple timer events at the same time.
>
> Here is an example:
>
> from("timer:start?period=100&delay=100&repeatCount=0")
>    .threads(1, 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
>        .to("bean:wscall");
>
> However, usage of threads DSL with original timer consumer does not make sense because of synchronicity of timer consumer, so I tried to develop asynchronous timer component that uses asynchronous API to fire events.
>
> Everything works fine except that there is a lot of inflight exchanges when stopping the route.
>
> Could you please suggest how to avoid such an issue?
>
> =================================================================================================================================================
>
> Below is the unit test that allows to reproduce this unexpected behavior.
>
>
> package foo.bar;
>
> import org.apache.camel.AsyncCallback;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.LoggingLevel;
> import org.apache.camel.Processor;
> import org.apache.camel.Producer;
> import org.apache.camel.RuntimeCamelException;
> import org.apache.camel.Service;
> import org.apache.camel.ThreadPoolRejectedPolicy;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultComponent;
> import org.apache.camel.impl.DefaultConsumer;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.apache.camel.util.AsyncProcessorHelper;
> import org.junit.Test;
>
> import java.util.Collection;
> import java.util.Date;
> import java.util.HashMap;
> import java.util.Map;
> import java.util.Timer;
> import java.util.TimerTask;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicLong;
>
> public class CamelRoutingTest extends CamelTestSupport {
>
>    @Test
>    public void route() throws Exception {
>        Endpoint endpoint = context().getRoute("timer").getConsumer().getEndpoint();
>
>        // make timer fire for several times
>        TimeUnit.SECONDS.sleep(5);
>
>        context().stopRoute("timer", 10, TimeUnit.SECONDS);
>
>        int inflightExchanges = context().getInflightRepository().size(endpoint);
>        assertTrue("inflightExchanges: " + inflightExchanges, inflightExchanges == 0);
>    }
>
>    @Override
>    protected JndiRegistry createRegistry() throws Exception {
>        JndiRegistry registry = super.createRegistry();
>        registry.bind("atimer", new AsyncTimerComponent());
>        return registry;
>    }
>
>    @Override
>    protected int getShutdownTimeout() {
>        return 1;
>    }
>
>    @Override
>    protected RouteBuilder createRouteBuilder() throws Exception {
>        return new RouteBuilder() {
>            @Override
>            public void configure() throws Exception {
>                from("atimer:start?period=100&delay=100&repeatCount=0").id("timer")
>                        .threads(1, 5).maxQueueSize(5).rejectedPolicy(ThreadPoolRejectedPolicy.DiscardOldest)
>                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
>                                 "START: ${header." + Exchange.BREADCRUMB_ID + "}")
>                            .delay(1000)
>                            .log(LoggingLevel.INFO, "org.apache.camel.LOGGER",
>                                 "COMPLETE: ${header." + Exchange.BREADCRUMB_ID + "}")
>                            .to("mock:result");
>            }
>        };
>    }
>
>    public static class AsyncTimerComponent extends DefaultComponent {
>        private final Map<String, Timer> timers = new HashMap<String, Timer>();
>        public Timer getTimer(AsyncTimerEndpoint endpoint) {
>            String key = endpoint.getTimerName();
>            if (!endpoint.isDaemon()) {
>                key = "nonDaemon:" + key;
>            }
>
>            Timer answer;
>            synchronized (timers) {
>                answer = timers.get(key);
>                if (answer == null) {
>                    answer = new Timer(endpoint.getTimerName(), endpoint.isDaemon());
>                    timers.put(key, answer);
>                }
>            }
>            return answer;
>        }
>
>        @Override
>        protected AsyncTimerEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
>            AsyncTimerEndpoint answer = new AsyncTimerEndpoint(uri, this, remaining);
>            setProperties(answer, parameters);
>            return answer;
>        }
>
>        @Override
>        protected void doStop() throws Exception {
>            Collection<Timer> collection = timers.values();
>            for (Timer timer : collection) {
>                timer.cancel();
>            }
>            timers.clear();
>        }
>    }
>
>    public static class AsyncTimerEndpoint extends DefaultEndpoint implements Service {
>        private String timerName;
>        private long period = 1000;
>        private long delay;
>        private boolean fixedRate;
>        private boolean daemon = true;
>        private Timer timer;
>        private long repeatCount;
>
>        public AsyncTimerEndpoint(String fullURI, AsyncTimerComponent component, String timerName) {
>            super(fullURI, component);
>            this.timerName = timerName;
>        }
>        public Producer createProducer() throws Exception {
>            throw new RuntimeCamelException("There is no producer");
>        }
>
>        public AsyncTimerConsumer createConsumer(Processor processor) throws Exception {
>            return new AsyncTimerConsumer(this, processor);
>        }
>
>        public void start() throws Exception {
>            // noop
>        }
>
>        public void stop() throws Exception {
>            setTimer(null);
>        }
>
>        public String getTimerName() {
>            if (timerName == null) {
>                timerName = getEndpointUri();
>            }
>            return timerName;
>        }
>
>        public void setTimerName(String timerName) {
>            this.timerName = timerName;
>        }
>
>        public boolean isDaemon() {
>            return daemon;
>        }
>
>        public void setDaemon(boolean daemon) {
>            this.daemon = daemon;
>        }
>
>        public long getDelay() {
>            return delay;
>        }
>
>        public void setDelay(long delay) {
>            this.delay = delay;
>        }
>
>        public boolean isFixedRate() {
>            return fixedRate;
>        }
>
>        public void setFixedRate(boolean fixedRate) {
>            this.fixedRate = fixedRate;
>        }
>
>        public long getPeriod() {
>            return period;
>        }
>
>        public void setPeriod(long period) {
>            this.period = period;
>        }
>
>        public long getRepeatCount() {
>            return repeatCount;
>        }
>
>        public void setRepeatCount(long repeatCount) {
>            this.repeatCount = repeatCount;
>        }
>
>        public boolean isSingleton() {
>            return true;
>        }
>
>        public synchronized Timer getTimer() {
>            if (timer == null) {
>                AsyncTimerComponent tc = (AsyncTimerComponent) getComponent();
>                timer = tc.getTimer(this);
>            }
>            return timer;
>        }
>
>        public synchronized void setTimer(Timer timer) {
>            this.timer = timer;
>        }
>
>        public String getEndpointUri() {
>            return super.getEndpointUri();
>        }
>    }
>
>    public static class AsyncTimerConsumer extends DefaultConsumer {
>
>        private final AsyncTimerEndpoint endpoint;
>        private volatile TimerTask task;
>
>        public AsyncTimerConsumer(AsyncTimerEndpoint endpoint, Processor processor) {
>            super(endpoint, processor);
>            this.endpoint = endpoint;
>        }
>
>        @Override
>        protected void doStart() throws Exception {
>            task = new TimerTask() {
>                // counter
>                private final AtomicLong counter = new AtomicLong();
>
>                @Override
>                public void run() {
>                    try {
>                        long count = counter.incrementAndGet();
>
>                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
>                        if (fire) {
>                            sendTimerExchange(count);
>                        } else {
>                            // no need to fire anymore as we exceeded repeat count
>                            log.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
>                            cancel();
>                        }
>                    } catch (Throwable e) {
>                        // catch all to avoid the JVM closing the thread and not firing again
>                        log.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
>                    }
>                }
>            };
>
>            Timer timer = endpoint.getTimer();
>            configureTask(task, timer);
>        }
>
>        @Override
>        protected void doStop() throws Exception {
>            if (task != null) {
>                task.cancel();
>            }
>            task = null;
>        }
>
>        protected void configureTask(TimerTask task, Timer timer) {
>            if (endpoint.isFixedRate()) {
>                timer.scheduleAtFixedRate(task, endpoint.getDelay(), endpoint.getPeriod());
>            } else {
>                if (endpoint.getPeriod() > 0) {
>                    timer.schedule(task, endpoint.getDelay(), endpoint.getPeriod());
>                } else {
>                    timer.schedule(task, endpoint.getDelay());
>                }
>            }
>        }
>
>        protected void sendTimerExchange(long counter) {
>            Exchange exchange = endpoint.createExchange();
>            exchange.setProperty(Exchange.TIMER_COUNTER, counter);
>            exchange.setProperty(Exchange.TIMER_NAME, endpoint.getTimerName());
>            exchange.setProperty(Exchange.TIMER_PERIOD, endpoint.getPeriod());
>            exchange.setProperty(Exchange.TIMER_FIRED_TIME, new Date());
>
>            log.trace("Timer {} is firing #{} count", endpoint.getTimerName(), counter);
>            try {
>                AsyncProcessorHelper.process(getAsyncProcessor(), exchange, new AsyncCallback() {
>                    public void done(boolean doneSync) {
>                        // noop
>                    }
>                });
>
>                // log exception if an exception occurred and was not handled
>                if (exchange.getException() != null) {
>                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
>                }
>            } catch (Exception e) {
>                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
>            }
>        }
>
>    }
>
> Best Regards,
> Sergey Zhemzhitsky
>
> _______________________________________________________
>
> The information contained in this message may be privileged and conf idential and protected from disclosure. If you are not the original intended recipient, you are hereby notified that any review, retransmission, dissemination, or other use of, or taking of any action in reliance upon, this information is prohibited. If you have received this communication in error, please notify the sender immediately by replying to this message and delete it from your computer. Thank you for your cooperation. Troika Dialog, Russia.
> If you need assistance please contact our Contact Center  (+7495) 258 0500 or go to www.troika.ru/eng/Contacts/system.wbp
>



--
Claus Ibsen
-----------------
FuseSource
Email: [hidden email]
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/