|
Author: davsclaus
Date: Tue May 29 12:52:20 2012 New Revision: 1343704 URL: http://svn.apache.org/viewvc?rev=1343704&view=rev Log: CAMEL-5316: Failover EIP now detects shutdown in progress and breaks out from failover loop. Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java - copied, changed from r1343673, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1343704&r1=1343703&r2=1343704&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Tue May 29 12:52:20 2012 @@ -17,10 +17,13 @@ package org.apache.camel.processor.loadbalancer; import java.util.List; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Traceable; @@ -36,9 +39,10 @@ import org.apache.camel.util.ObjectHelpe * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the * pipeline to ensure it works the same and the async routing engine is flawless. */ -public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable { +public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware { private final List<Class<?>> exceptions; + private CamelContext camelContext; private boolean roundRobin; private int maximumFailoverAttempts = -1; @@ -60,6 +64,16 @@ public class FailOverLoadBalancer extend } } + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + public List<Class<?>> getExceptions() { return exceptions; } @@ -113,6 +127,16 @@ public class FailOverLoadBalancer extend return answer; } + @Override + public boolean isRunAllowed() { + // determine if we can still run, or the camel context is forcing a shutdown + boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); + if (forceShutdown) { + log.trace("Run not allowed as ShutdownStrategy is forcing shutting down"); + } + return !forceShutdown && super.isRunAllowed(); + } + public boolean process(final Exchange exchange, final AsyncCallback callback) { final List<Processor> processors = getProcessors(); @@ -133,6 +157,18 @@ public class FailOverLoadBalancer extend log.trace("Failover starting with endpoint index {}", index); while (first || shouldFailOver(copy)) { + + // can we still run + if (!isRunAllowed()) { + log.trace("Run not allowed, will reject executing exchange: {}", exchange); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + // we cannot process so invoke callback + callback.done(true); + return true; + } + if (!first) { attempts.incrementAndGet(); // are we exhausted by attempts? @@ -240,6 +276,17 @@ public class FailOverLoadBalancer extend } while (shouldFailOver(copy)) { + + // can we still run + if (!isRunAllowed()) { + log.trace("Run not allowed, will reject executing exchange: {}", exchange); + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + // we cannot process so invoke callback + callback.done(false); + } + attempts.incrementAndGet(); // are we exhausted by attempts? if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) { Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java (from r1343673, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java&r1=1343673&r2=1343704&rev=1343704&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java Tue May 29 12:52:20 2012 @@ -23,11 +23,11 @@ import org.apache.camel.builder.RouteBui import org.apache.camel.util.StopWatch; /** - * Tests that the redelivery error handler will break out if CamelContext is shutting down. + * Tests that the failover load balancer will break out if CamelContext is shutting down. */ -public class RedeliveryErrorHandlerBreakoutDuringShutdownTest extends ContextTestSupport { +public class FailoverLoadBalancerBreakoutDuringShutdownTest extends ContextTestSupport { - public void testRedelivery() throws Exception { + public void testFailover() throws Exception { getMockEndpoint("mock:before").expectedMessageCount(1); getMockEndpoint("mock:after").expectedMessageCount(0); @@ -44,7 +44,7 @@ public class RedeliveryErrorHandlerBreak context.stop(); // should take less than 5 seconds - assertTrue("Should take less than 5 seconds, was {}", watch.stop() < 5000); + assertTrue("Should take less than 5 seconds, was " + watch.taken(), watch.stop() < 5000); } @Override @@ -52,17 +52,29 @@ public class RedeliveryErrorHandlerBreak return new RouteBuilder() { @Override public void configure() throws Exception { - // just keep on redelivering - errorHandler(defaultErrorHandler().maximumRedeliveries(-1).redeliveryDelay(1000)); from("seda:start") .to("mock:before") + // just keep on failover + .loadBalance().failover(-1, false, true) + .to("direct:a") + .to("direct:b") + .end() + .to("mock:after"); + + from("direct:a") .process(new Processor() { public void process(Exchange exchange) throws Exception { throw new IllegalArgumentException("Forced"); } - }) - .to("mock:after"); + }); + + from("direct:b") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + throw new IllegalArgumentException("Forced"); + } + }); } }; } |
| Powered by Nabble | Edit this page |
