Quantcast

Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8

Claus Ibsen-2
Hi Luca

This happens also for me. There is a little secret that if you change
the Message API then the Scala DSL needs to be changed accordingly.

The camel-scala can not be compiled now.

On Thu, Feb 16, 2017 at 6:10 PM,  <[hidden email]> wrote:

> Repository: camel
> Updated Branches:
>   refs/heads/master 8e0e3083e -> 8bc8484b1
>
>
> CAMEL-10724: Improve Java DSL support for Java 8
>
>
> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b
>
> Branch: refs/heads/master
> Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
> Parents: 8e0e308
> Author: lburgazzoli <[hidden email]>
> Authored: Wed Jan 18 18:09:08 2017 +0100
> Committer: lburgazzoli <[hidden email]>
> Committed: Thu Feb 16 18:10:22 2017 +0100
>
> ----------------------------------------------------------------------
>  .../src/main/java/org/apache/camel/Message.java | 16 ++++-
>  .../org/apache/camel/impl/DefaultMessage.java   | 31 +++++++++
>  .../apache/camel/model/AggregateDefinition.java | 20 ++++++
>  .../model/IdempotentConsumerDefinition.java     |  9 +--
>  .../apache/camel/model/MulticastDefinition.java | 26 +++++++
>  .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
>  .../org/apache/camel/util/ExchangeHelper.java   | 18 +++++
>  .../apache/camel/util/function/Suppliers.java   | 43 ++++++++++++
>  .../apache/camel/impl/DefaultExchangeTest.java  |  5 ++
>  .../camel/processor/DynamicRouter4Test.java     | 58 ++++++++++++++++
>  .../processor/IdempotentConsumerDslTest.java    | 53 ++++++++++++++
>  .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
>  .../camel/processor/MulticastDslTest.java       | 69 +++++++++++++++++++
>  .../camel/processor/RoutingSlipDslTest.java     | 49 +++++++++++++
>  .../camel/processor/ThrottlerDslTest.java       | 72 ++++++++++++++++++++
>  .../processor/aggregator/AggregateDslTest.java  | 41 +++++++++--
>  16 files changed, 582 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
> index 8cfeae0..a0e9c4d 100644
> --- a/camel-core/src/main/java/org/apache/camel/Message.java
> +++ b/camel-core/src/main/java/org/apache/camel/Message.java
> @@ -18,7 +18,7 @@ package org.apache.camel;
>
>  import java.util.Map;
>  import java.util.Set;
> -
> +import java.util.function.Supplier;
>  import javax.activation.DataHandler;
>
>  /**
> @@ -88,6 +88,13 @@ public interface Message {
>      Object getHeader(String name, Object defaultValue);
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     */
> +    Object getHeader(String name, Supplier<Object> defaultValueSupplier);
> +
> +    /**
>       * Returns a header associated with this message by name and specifying the
>       * type required
>       *
> @@ -112,6 +119,13 @@ public interface Message {
>      <T> T getHeader(String name, Object defaultValue, Class<T> type);
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     */
> +    <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
> +
> +    /**
>       * Sets a header on the message
>       *
>       * @param name of the header
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> index 848586a..1e49766 100644
> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
> @@ -20,6 +20,7 @@ import java.util.HashSet;
>  import java.util.LinkedHashMap;
>  import java.util.Map;
>  import java.util.Set;
> +import java.util.function.Supplier;
>  import javax.activation.DataHandler;
>
>  import org.apache.camel.Attachment;
> @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
>          return answer != null ? answer : defaultValue;
>      }
>
> +    public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
> +        Object answer = getHeaders().get(name);
> +        return answer != null ? answer : defaultValueSupplier.get();
> +    }
> +
>      @SuppressWarnings("unchecked")
>      public <T> T getHeader(String name, Class<T> type) {
>          Object value = getHeader(name);
> @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
>          }
>      }
>
> +    @SuppressWarnings("unchecked")
> +    public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
> +        Object value = getHeader(name, defaultValueSupplier);
> +        if (value == null) {
> +            // lets avoid NullPointerException when converting to boolean for null values
> +            if (boolean.class.isAssignableFrom(type)) {
> +                return (T) Boolean.FALSE;
> +            }
> +            return null;
> +        }
> +
> +        // eager same instance type test to avoid the overhead of invoking the type converter
> +        // if already same type
> +        if (type.isInstance(value)) {
> +            return type.cast(value);
> +        }
> +
> +        Exchange e = getExchange();
> +        if (e != null) {
> +            return e.getContext().getTypeConverter().convertTo(type, e, value);
> +        } else {
> +            return type.cast(value);
> +        }
> +    }
> +
>      public void setHeader(String name, Object value) {
>          if (headers == null) {
>              headers = createHeaders();
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> index ec7d396..12f0a13 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
> @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
> +        return aggregationStrategy(aggregationStrategy);
> +    }
> +
> +    /**
>       * Sets the aggregate strategy to use
>       *
>       * @param aggregationStrategy  the aggregate strategy to use
> @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
> +        return completionPredicate(predicate);
> +    }
> +
> +    /**
>       * Indicates to complete all current aggregated exchanges when the context is stopped
>       */
>      public AggregateDefinition forceCompletionOnStop() {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> index 256394d..9a58704 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
> @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>      public Processor createProcessor(RouteContext routeContext) throws Exception {
>          Processor childProcessor = this.createChildProcessor(routeContext, true);
>
> -        IdempotentRepository<String> idempotentRepository =
> -                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
> +        IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
>          ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
>
>          Expression expression = getExpression().createExpression(routeContext);
> @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>          boolean eager = getEager() == null || getEager();
>          boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
>          boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
> +
>          // these boolean should be false by default
>          boolean completionEager = getCompletionEager() != null && getCompletionEager();
>
> @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>       * @param routeContext route context
>       * @return the repository
>       */
> -    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
> +    @SuppressWarnings("unchecked")
> +    protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
>          if (messageIdRepositoryRef != null) {
>              idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
>          }
> -        return idempotentRepository;
> +        return (IdempotentRepository<T>)idempotentRepository;
>      }
>  }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> index 7bff217..37efcc6 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
> @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
>
>  import org.apache.camel.CamelContextAware;
>  import org.apache.camel.Processor;
> +import org.apache.camel.builder.AggregationStrategyClause;
> +import org.apache.camel.builder.ProcessClause;
>  import org.apache.camel.processor.MulticastProcessor;
>  import org.apache.camel.processor.aggregate.AggregationStrategy;
>  import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
> @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>      // -------------------------------------------------------------------------
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
> +        AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
> +        setAggregationStrategy(clause);
> +        return clause;
> +    }
> +
> +    /**
>       * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
>       * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
>       * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
> @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ProcessClause<MulticastDefinition> onPrepare() {
> +        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
> +        setOnPrepare(clause);
> +        return clause;
> +    }
> +
> +    /**
>       * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
>       * This can be used to deep-clone messages that should be send, or any custom logic needed before
>       * the exchange is send.
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> index 21fbe2e..c40a0bd 100644
> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
> @@ -28,6 +28,7 @@ import java.util.Map;
>  import java.util.concurrent.ExecutorService;
>  import java.util.concurrent.TimeUnit;
>  import java.util.concurrent.atomic.AtomicInteger;
> +import java.util.function.Supplier;
>  import javax.xml.bind.annotation.XmlAccessType;
>  import javax.xml.bind.annotation.XmlAccessorType;
>  import javax.xml.bind.annotation.XmlAnyAttribute;
> @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
>  import org.apache.camel.spi.LifecycleStrategy;
>  import org.apache.camel.spi.Policy;
>  import org.apache.camel.spi.RouteContext;
> +import org.apache.camel.support.ExpressionAdapter;
>  import org.slf4j.Logger;
>  import org.slf4j.LoggerFactory;
>
> @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
> +        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
> +        addOutput(answer);
> +
> +        return ExpressionClause.createAndSetExpression(answer);
> +    }
> +
> +    /**
>       * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
>       * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
>       * to avoid duplicate messages
> @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>       */
>      public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
>          AggregateDefinition answer = new AggregateDefinition();
> -        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
> +        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
>          answer.setExpression(clause);
>          answer.setAggregationStrategy(aggregationStrategy);
>          addOutput(answer);
> @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ExpressionClause<ThrottleDefinition> throttle() {
> +        ThrottleDefinition answer = new ThrottleDefinition();
> +        addOutput(answer);
> +
> +        return ExpressionClause.createAndSetExpression(answer);
> +    }
> +
> +    /**
>       * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
>       * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
>       * or that we don't exceed an agreed SLA with some external service.
> @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * TODO: document
> +     * Note: this is experimental and subject to changes in future releases.
> +     *
> +     * @return the builder
> +     */
> +    public ExpressionClause<LoopDefinition> loopDoWhile() {
> +        LoopDefinition loop = new LoopDefinition();
> +        loop.setDoWhile(true);
> +
> +        addOutput(loop);
> +
> +        return ExpressionClause.createAndSetExpression(loop);
> +    }
> +
> +    /**
>       * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
>       * Creates a loop allowing to process the a message a number of times and possibly process them
>       * in a different way.
> @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      }
>
>      /**
> +     * Adds a processor which sets the header on the IN message
> +     *
> +     * @param name  the header name
> +     * @param supplier the supplier used to set the header
> +     * @return the builder
> +     */
> +    @SuppressWarnings("unchecked")
> +    public Type setHeader(String name, final Supplier<Object> supplier) {
> +        SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
> +            @Override
> +            public Object evaluate(Exchange exchange) {
> +                return supplier.get();
> +            }
> +        });
> +
> +        addOutput(answer);
> +        return (Type) this;
> +    }
> +
> +    /**
>       * Adds a processor which sets the header on the OUT message
>       *
>       * @param name  the header name
> @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>      public String getLabel() {
>          return "";
>      }
> -
>  }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> index ce3fdca..6e5967e 100644
> --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
> @@ -130,6 +130,24 @@ public final class ExchangeHelper {
>      }
>
>      /**
> +     * Gets an header or property of the correct type
> +     *
> +     * @param exchange      the exchange
> +     * @param name          the name of the header or the property
> +     * @param type          the type
> +     * @return the header or property value
> +     * @throws TypeConversionException is thrown if error during type conversion
> +     * @throws NoSuchHeaderException is thrown if no headers exists
> +     */
> +    public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
> +        T answer = exchange.getIn().getHeader(name, type);
> +        if (answer == null) {
> +            answer = exchange.getProperty(name, type);
> +        }
> +        return answer;
> +    }
> +
> +    /**
>       * Returns the mandatory inbound message body of the correct type or throws
>       * an exception if it is not present
>       *
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> new file mode 100644
> index 0000000..4f8f845
> --- /dev/null
> +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
> @@ -0,0 +1,43 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.util.function;
> +
> +import java.util.Objects;
> +import java.util.concurrent.atomic.AtomicReference;
> +import java.util.function.Supplier;
> +
> +public final class Suppliers {
> +    private Suppliers() {
> +    }
> +
> +    public static <T> Supplier<T> memorize(Supplier<T> supplier) {
> +        final AtomicReference<T> valueHolder = new AtomicReference<>();
> +        return () -> {
> +            T supplied = valueHolder.get();
> +            if (supplied == null) {
> +                synchronized (valueHolder) {
> +                    supplied = valueHolder.get();
> +                    if (supplied == null) {
> +                        supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
> +                        valueHolder.lazySet(supplied);
> +                    }
> +                }
> +            }
> +            return supplied;
> +        };
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> index 43bd8ff..ee14ca0 100644
> --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
> @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
>          assertEquals("123", exchange.getIn().getHeader("bar", String.class));
>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
> +        assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
> +        assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
>
>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
>          assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
> +        assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
> +        assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
>
>          assertEquals(234, exchange.getIn().getHeader("cheese", 234));
>          assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
> +        assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
>      }
>
>      public void testProperty() throws Exception {
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> new file mode 100644
> index 0000000..4f68bc0
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
> @@ -0,0 +1,58 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.Exchange;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.util.ExchangeHelper;
> +
> +public class DynamicRouter4Test extends ContextTestSupport {
> +    public void testDynamicRouter() throws Exception {
> +        getMockEndpoint("mock:a").expectedMessageCount(1);
> +        getMockEndpoint("mock:b").expectedMessageCount(1);
> +        getMockEndpoint("mock:c").expectedMessageCount(1);
> +
> +        template.sendBody("direct:start-1", "Hello World");
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("direct:start-1")
> +                    .dynamicRouter()
> +                        .exchange(DynamicRouter4Test::slip);
> +            }
> +        };
> +    }
> +
> +    public static String slip(Exchange exchange) {
> +        String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
> +        if (previous == null) {
> +            return "mock:a,mock:b";
> +        } else if ("mock://b".equals(previous)) {
> +            return "mock:c";
> +        }
> +
> +        // no more so return null
> +        return null;
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> new file mode 100644
> index 0000000..9afd3f9
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
> @@ -0,0 +1,53 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
> +
> +public class IdempotentConsumerDslTest extends ContextTestSupport {
> +
> +    public void testDuplicateMessages() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:result");
> +        mock.expectedBodiesReceived("one", "two", "three");
> +
> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
> +        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
> +
> +        mock.assertIsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() {
> +                from("direct:start")
> +                    .idempotentConsumer()
> +                        .message(m -> m.getHeader("messageId"))
> +                        .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
> +                    .to("mock:result");
> +            }
> +        };
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> index 2ef927a..54b5f6a 100644
> --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
> @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
>
>  public class LoopDoWhileTest extends ContextTestSupport {
>
> -    public void testLoopDoWhile() throws Exception {
> +    public void testLoopDoWhileSimple() throws Exception {
>          getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>          getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>
> -        template.sendBody("direct:start", "A");
> +        template.sendBody("direct:simple", "A");
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    public void testLoopDoWhileFunctional() throws Exception {
> +        getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
> +        getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
> +
> +        template.sendBody("direct:functional", "A");
>
>          assertMockEndpointsSatisfied();
>      }
> @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
>          return new RouteBuilder() {
>              @Override
>              public void configure() throws Exception {
> -                from("direct:start")
> +                from("direct:simple")
>                      .loopDoWhile(simple("${body.length} <= 5"))
>                          .to("mock:loop")
>                          .transform(body().append("A"))
>                      .end()
>                      .to("mock:result");
> +                from("direct:functional")
> +                    .loopDoWhile()
> +                        .body(String.class, b -> b.length() <= 5)
> +                        .to("mock:loop")
> +                        .transform()
> +                            .body(String.class, b -> b += "A")
> +                    .end()
> +                    .to("mock:result");
>              }
>          };
>      }
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> new file mode 100644
> index 0000000..a800e36
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
> @@ -0,0 +1,69 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class MulticastDslTest extends ContextTestSupport {
> +    public void testMulticastDsl() throws Exception {
> +        MockEndpoint mock = getMockEndpoint("mock:result");
> +        mock.expectedMessageCount(1);
> +        mock.expectedHeaderReceived("onPrepare", true);
> +        mock.expectedBodiesReceived(5);
> +
> +        template.sendBody("direct:start", 1);
> +
> +        mock.assertIsSatisfied();
> +    }
> +
> +    @Override
> +    protected RouteBuilder createRouteBuilder() throws Exception {
> +        return new RouteBuilder() {
> +            @Override
> +            public void configure() throws Exception {
> +                from("direct:start")
> +                    .multicast()
> +                        .onPrepare()
> +                            .message(m -> m.setHeader("onPrepare", true))
> +                        .aggregationStrategy()
> +                            .body(Integer.class, (o, n) -> o != null ? o + n : n)
> +                        .to("direct:increase-by-1")
> +                        .to("direct:increase-by-2")
> +                        .end()
> +                    .to("mock:result");
> +
> +                from("direct:increase-by-1")
> +                    .bean(new Increase(1));
> +                from("direct:increase-by-2")
> +                    .bean(new Increase(2));
> +            }
> +        };
> +    }
> +
> +    public static class Increase {
> +        private final int amount;
> +        public Increase(int amount) {
> +            this.amount = amount;
> +        }
> +
> +        public int add(int num) {
> +            return num + amount;
> +        }
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> new file mode 100644
> index 0000000..9d296b8
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
> @@ -0,0 +1,49 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class RoutingSlipDslTest extends ContextTestSupport {
> +
> +    public void testRoutingSlipDsl() throws Exception {
> +        MockEndpoint x = getMockEndpoint("mock:x");
> +        MockEndpoint y = getMockEndpoint("mock:y");
> +        MockEndpoint z = getMockEndpoint("mock:z");
> +
> +        x.expectedBodiesReceived("foo", "bar");
> +        y.expectedBodiesReceived("foo", "bar");
> +        z.expectedBodiesReceived("foo", "bar");
> +
> +        template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
> +        template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
> +
> +        assertMockEndpointsSatisfied();
> +    }
> +
> +    protected RouteBuilder createRouteBuilder() {
> +        return new RouteBuilder() {
> +            public void configure() {
> +                from("direct:a").routingSlip()
> +                    .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
> +                    .end();
> +            }
> +        };
> +    }
> +}
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> new file mode 100644
> index 0000000..a971332
> --- /dev/null
> +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
> @@ -0,0 +1,72 @@
> +/**
> + * Licensed to the Apache Software Foundation (ASF) under one or more
> + * contributor license agreements.  See the NOTICE file distributed with
> + * this work for additional information regarding copyright ownership.
> + * The ASF licenses this file to You under the Apache License, Version 2.0
> + * (the "License"); you may not use this file except in compliance with
> + * the License.  You may obtain a copy of the License at
> + *
> + *      http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> + * distributed under the License is distributed on an "AS IS" BASIS,
> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> + * See the License for the specific language governing permissions and
> + * limitations under the License.
> + */
> +package org.apache.camel.processor;
> +
> +import java.util.concurrent.ExecutorService;
> +import java.util.concurrent.Executors;
> +
> +import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.builder.RouteBuilder;
> +import org.apache.camel.component.mock.MockEndpoint;
> +
> +public class ThrottlerDslTest extends ContextTestSupport {
> +    private static final int INTERVAL = 500;
> +    protected int messageCount = 9;
> +
> +    protected boolean canTest() {
> +        // skip test on windows as it does not run well there
> +        return !isPlatform("windows");
> +    }
> +
> +    public void testDsl() throws Exception {
> +        if (!canTest()) {
> +            return;
> +        }
> +
> +        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
> +        resultEndpoint.expectedMessageCount(messageCount);
> +
> +        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
> +
> +        long start = System.currentTimeMillis();
> +        for (int i = 0; i < messageCount; i++) {
> +            executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
> +        }
> +
> +        // let's wait for the exchanges to arrive
> +        resultEndpoint.assertIsSatisfied();
> +
> +        // now assert that they have actually been throttled
> +        long minimumTime = (messageCount - 1) * INTERVAL;
> +        // add a little slack
> +        long delta = System.currentTimeMillis() - start + 200;
> +        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
> +        executor.shutdownNow();
> +    }
> +
> +    protected RouteBuilder createRouteBuilder() {
> +        return new RouteBuilder() {
> +            public void configure() {
> +                from("direct:start")
> +                    .throttle()
> +                        .message(m -> m.getHeader("ThrottleCount", Integer.class))
> +                        .timePeriodMillis(INTERVAL)
> +                    .to("log:result", "mock:result");
> +            }
> +        };
> +    }
> +}
> \ No newline at end of file
>
> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> ----------------------------------------------------------------------
> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> index 55fd14e..f8d1db4 100644
> --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
> @@ -21,20 +21,21 @@ import java.util.stream.Collectors;
>  import java.util.stream.Stream;
>
>  import org.apache.camel.ContextTestSupport;
> +import org.apache.camel.Exchange;
>  import org.apache.camel.builder.RouteBuilder;
> -import org.apache.camel.component.mock.MockEndpoint;
>
>  public class AggregateDslTest extends ContextTestSupport {
>
>      public void testAggregate() throws Exception {
> -        MockEndpoint mock = getMockEndpoint("mock:aggregated");
> -        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
> +        getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
> +        getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>
>          for (int i = 0; i < 9; i++) {
>              template.sendBodyAndHeader("direct:start", i, "type", i % 3);
> +            template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
>          }
>
> -        mock.assertIsSatisfied();
> +        assertMockEndpointsSatisfied();
>      }
>
>      @Override
> @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
>                      .aggregate()
>                          .message(m -> m.getHeader("type"))
>                          .strategy()
> -                            .body(String.class, (o, n) ->  Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
> +                            .body(String.class, AggregateDslTest::joinString)
>                          .completion()
> -                            .body(String.class, s -> s.length() == 5)
> -                                    .to("mock:aggregated");
> +                            .body(String.class, s -> s.split(",").length == 2)
> +                    .to("mock:aggregated");
> +
> +                from("direct:start-supplier")
> +                    .aggregate()
> +                        .header("type")
> +                        .strategy(AggregateDslTest::joinStringStrategy)
> +                        .completion()
> +                            .body(String.class, s -> s.split(",").length == 3)
> +                    .to("mock:aggregated-supplier");
>              }
>          };
>      }
> +
> +    // *************************************************************************
> +    // Strategies
> +    // *************************************************************************
> +
> +    private static String joinString(String o, String n) {
> +        return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
> +    }
> +
> +    private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
> +        newExchange.getIn().setBody(
> +            joinString(
> +                oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
> +                newExchange.getIn().getBody(String.class))
> +        );
> +
> +        return newExchange;
> +    }
>  }
>
>



--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: camel git commit: CAMEL-10724: Improve Java DSL support for Java 8

lburgazzoli
Should be fixed now

---
Luca Burgazzoli


On Thu, Feb 16, 2017 at 8:01 PM, Claus Ibsen <[hidden email]> wrote:

> Hi Luca
>
> This happens also for me. There is a little secret that if you change
> the Message API then the Scala DSL needs to be changed accordingly.
>
> The camel-scala can not be compiled now.
>
> On Thu, Feb 16, 2017 at 6:10 PM,  <[hidden email]> wrote:
>> Repository: camel
>> Updated Branches:
>>   refs/heads/master 8e0e3083e -> 8bc8484b1
>>
>>
>> CAMEL-10724: Improve Java DSL support for Java 8
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/camel/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
>> Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
>> Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b
>>
>> Branch: refs/heads/master
>> Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
>> Parents: 8e0e308
>> Author: lburgazzoli <[hidden email]>
>> Authored: Wed Jan 18 18:09:08 2017 +0100
>> Committer: lburgazzoli <[hidden email]>
>> Committed: Thu Feb 16 18:10:22 2017 +0100
>>
>> ----------------------------------------------------------------------
>>  .../src/main/java/org/apache/camel/Message.java | 16 ++++-
>>  .../org/apache/camel/impl/DefaultMessage.java   | 31 +++++++++
>>  .../apache/camel/model/AggregateDefinition.java | 20 ++++++
>>  .../model/IdempotentConsumerDefinition.java     |  9 +--
>>  .../apache/camel/model/MulticastDefinition.java | 26 +++++++
>>  .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
>>  .../org/apache/camel/util/ExchangeHelper.java   | 18 +++++
>>  .../apache/camel/util/function/Suppliers.java   | 43 ++++++++++++
>>  .../apache/camel/impl/DefaultExchangeTest.java  |  5 ++
>>  .../camel/processor/DynamicRouter4Test.java     | 58 ++++++++++++++++
>>  .../processor/IdempotentConsumerDslTest.java    | 53 ++++++++++++++
>>  .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
>>  .../camel/processor/MulticastDslTest.java       | 69 +++++++++++++++++++
>>  .../camel/processor/RoutingSlipDslTest.java     | 49 +++++++++++++
>>  .../camel/processor/ThrottlerDslTest.java       | 72 ++++++++++++++++++++
>>  .../processor/aggregator/AggregateDslTest.java  | 41 +++++++++--
>>  16 files changed, 582 insertions(+), 17 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java
>> index 8cfeae0..a0e9c4d 100644
>> --- a/camel-core/src/main/java/org/apache/camel/Message.java
>> +++ b/camel-core/src/main/java/org/apache/camel/Message.java
>> @@ -18,7 +18,7 @@ package org.apache.camel;
>>
>>  import java.util.Map;
>>  import java.util.Set;
>> -
>> +import java.util.function.Supplier;
>>  import javax.activation.DataHandler;
>>
>>  /**
>> @@ -88,6 +88,13 @@ public interface Message {
>>      Object getHeader(String name, Object defaultValue);
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     */
>> +    Object getHeader(String name, Supplier<Object> defaultValueSupplier);
>> +
>> +    /**
>>       * Returns a header associated with this message by name and specifying the
>>       * type required
>>       *
>> @@ -112,6 +119,13 @@ public interface Message {
>>      <T> T getHeader(String name, Object defaultValue, Class<T> type);
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     */
>> +    <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type);
>> +
>> +    /**
>>       * Sets a header on the message
>>       *
>>       * @param name of the header
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> index 848586a..1e49766 100644
>> --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
>> @@ -20,6 +20,7 @@ import java.util.HashSet;
>>  import java.util.LinkedHashMap;
>>  import java.util.Map;
>>  import java.util.Set;
>> +import java.util.function.Supplier;
>>  import javax.activation.DataHandler;
>>
>>  import org.apache.camel.Attachment;
>> @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
>>          return answer != null ? answer : defaultValue;
>>      }
>>
>> +    public Object getHeader(String name, Supplier<Object> defaultValueSupplier) {
>> +        Object answer = getHeaders().get(name);
>> +        return answer != null ? answer : defaultValueSupplier.get();
>> +    }
>> +
>>      @SuppressWarnings("unchecked")
>>      public <T> T getHeader(String name, Class<T> type) {
>>          Object value = getHeader(name);
>> @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
>>          }
>>      }
>>
>> +    @SuppressWarnings("unchecked")
>> +    public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) {
>> +        Object value = getHeader(name, defaultValueSupplier);
>> +        if (value == null) {
>> +            // lets avoid NullPointerException when converting to boolean for null values
>> +            if (boolean.class.isAssignableFrom(type)) {
>> +                return (T) Boolean.FALSE;
>> +            }
>> +            return null;
>> +        }
>> +
>> +        // eager same instance type test to avoid the overhead of invoking the type converter
>> +        // if already same type
>> +        if (type.isInstance(value)) {
>> +            return type.cast(value);
>> +        }
>> +
>> +        Exchange e = getExchange();
>> +        if (e != null) {
>> +            return e.getContext().getTypeConverter().convertTo(type, e, value);
>> +        } else {
>> +            return type.cast(value);
>> +        }
>> +    }
>> +
>>      public void setHeader(String name, Object value) {
>>          if (headers == null) {
>>              headers = createHeaders();
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> index ec7d396..12f0a13 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
>> @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
>> +        return aggregationStrategy(aggregationStrategy);
>> +    }
>> +
>> +    /**
>>       * Sets the aggregate strategy to use
>>       *
>>       * @param aggregationStrategy  the aggregate strategy to use
>> @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
>> +        return completionPredicate(predicate);
>> +    }
>> +
>> +    /**
>>       * Indicates to complete all current aggregated exchanges when the context is stopped
>>       */
>>      public AggregateDefinition forceCompletionOnStop() {
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> index 256394d..9a58704 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
>> @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>>      public Processor createProcessor(RouteContext routeContext) throws Exception {
>>          Processor childProcessor = this.createChildProcessor(routeContext, true);
>>
>> -        IdempotentRepository<String> idempotentRepository =
>> -                (IdempotentRepository<String>) resolveMessageIdRepository(routeContext);
>> +        IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext);
>>          ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
>>
>>          Expression expression = getExpression().createExpression(routeContext);
>> @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>>          boolean eager = getEager() == null || getEager();
>>          boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
>>          boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
>> +
>>          // these boolean should be false by default
>>          boolean completionEager = getCompletionEager() != null && getCompletionEager();
>>
>> @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode {
>>       * @param routeContext route context
>>       * @return the repository
>>       */
>> -    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) {
>> +    @SuppressWarnings("unchecked")
>> +    protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) {
>>          if (messageIdRepositoryRef != null) {
>>              idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class);
>>          }
>> -        return idempotentRepository;
>> +        return (IdempotentRepository<T>)idempotentRepository;
>>      }
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> index 7bff217..37efcc6 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
>> @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
>>
>>  import org.apache.camel.CamelContextAware;
>>  import org.apache.camel.Processor;
>> +import org.apache.camel.builder.AggregationStrategyClause;
>> +import org.apache.camel.builder.ProcessClause;
>>  import org.apache.camel.processor.MulticastProcessor;
>>  import org.apache.camel.processor.aggregate.AggregationStrategy;
>>  import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
>> @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>>      // -------------------------------------------------------------------------
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
>> +        AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
>> +        setAggregationStrategy(clause);
>> +        return clause;
>> +    }
>> +
>> +    /**
>>       * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
>>       * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
>>       * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
>> @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ProcessClause<MulticastDefinition> onPrepare() {
>> +        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
>> +        setOnPrepare(clause);
>> +        return clause;
>> +    }
>> +
>> +    /**
>>       * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
>>       * This can be used to deep-clone messages that should be send, or any custom logic needed before
>>       * the exchange is send.
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> index 21fbe2e..c40a0bd 100644
>> --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
>> @@ -28,6 +28,7 @@ import java.util.Map;
>>  import java.util.concurrent.ExecutorService;
>>  import java.util.concurrent.TimeUnit;
>>  import java.util.concurrent.atomic.AtomicInteger;
>> +import java.util.function.Supplier;
>>  import javax.xml.bind.annotation.XmlAccessType;
>>  import javax.xml.bind.annotation.XmlAccessorType;
>>  import javax.xml.bind.annotation.XmlAnyAttribute;
>> @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
>>  import org.apache.camel.spi.LifecycleStrategy;
>>  import org.apache.camel.spi.Policy;
>>  import org.apache.camel.spi.RouteContext;
>> +import org.apache.camel.support.ExpressionAdapter;
>>  import org.slf4j.Logger;
>>  import org.slf4j.LoggerFactory;
>>
>> @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() {
>> +        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
>> +        addOutput(answer);
>> +
>> +        return ExpressionClause.createAndSetExpression(answer);
>> +    }
>> +
>> +    /**
>>       * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
>>       * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
>>       * to avoid duplicate messages
>> @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>       */
>>      public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
>>          AggregateDefinition answer = new AggregateDefinition();
>> -        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer);
>> +        ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer);
>>          answer.setExpression(clause);
>>          answer.setAggregationStrategy(aggregationStrategy);
>>          addOutput(answer);
>> @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ExpressionClause<ThrottleDefinition> throttle() {
>> +        ThrottleDefinition answer = new ThrottleDefinition();
>> +        addOutput(answer);
>> +
>> +        return ExpressionClause.createAndSetExpression(answer);
>> +    }
>> +
>> +    /**
>>       * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
>>       * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
>>       * or that we don't exceed an agreed SLA with some external service.
>> @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * TODO: document
>> +     * Note: this is experimental and subject to changes in future releases.
>> +     *
>> +     * @return the builder
>> +     */
>> +    public ExpressionClause<LoopDefinition> loopDoWhile() {
>> +        LoopDefinition loop = new LoopDefinition();
>> +        loop.setDoWhile(true);
>> +
>> +        addOutput(loop);
>> +
>> +        return ExpressionClause.createAndSetExpression(loop);
>> +    }
>> +
>> +    /**
>>       * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
>>       * Creates a loop allowing to process the a message a number of times and possibly process them
>>       * in a different way.
>> @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      }
>>
>>      /**
>> +     * Adds a processor which sets the header on the IN message
>> +     *
>> +     * @param name  the header name
>> +     * @param supplier the supplier used to set the header
>> +     * @return the builder
>> +     */
>> +    @SuppressWarnings("unchecked")
>> +    public Type setHeader(String name, final Supplier<Object> supplier) {
>> +        SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() {
>> +            @Override
>> +            public Object evaluate(Exchange exchange) {
>> +                return supplier.get();
>> +            }
>> +        });
>> +
>> +        addOutput(answer);
>> +        return (Type) this;
>> +    }
>> +
>> +    /**
>>       * Adds a processor which sets the header on the OUT message
>>       *
>>       * @param name  the header name
>> @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
>>      public String getLabel() {
>>          return "";
>>      }
>> -
>>  }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> index ce3fdca..6e5967e 100644
>> --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
>> @@ -130,6 +130,24 @@ public final class ExchangeHelper {
>>      }
>>
>>      /**
>> +     * Gets an header or property of the correct type
>> +     *
>> +     * @param exchange      the exchange
>> +     * @param name          the name of the header or the property
>> +     * @param type          the type
>> +     * @return the header or property value
>> +     * @throws TypeConversionException is thrown if error during type conversion
>> +     * @throws NoSuchHeaderException is thrown if no headers exists
>> +     */
>> +    public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
>> +        T answer = exchange.getIn().getHeader(name, type);
>> +        if (answer == null) {
>> +            answer = exchange.getProperty(name, type);
>> +        }
>> +        return answer;
>> +    }
>> +
>> +    /**
>>       * Returns the mandatory inbound message body of the correct type or throws
>>       * an exception if it is not present
>>       *
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> new file mode 100644
>> index 0000000..4f8f845
>> --- /dev/null
>> +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
>> @@ -0,0 +1,43 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.util.function;
>> +
>> +import java.util.Objects;
>> +import java.util.concurrent.atomic.AtomicReference;
>> +import java.util.function.Supplier;
>> +
>> +public final class Suppliers {
>> +    private Suppliers() {
>> +    }
>> +
>> +    public static <T> Supplier<T> memorize(Supplier<T> supplier) {
>> +        final AtomicReference<T> valueHolder = new AtomicReference<>();
>> +        return () -> {
>> +            T supplied = valueHolder.get();
>> +            if (supplied == null) {
>> +                synchronized (valueHolder) {
>> +                    supplied = valueHolder.get();
>> +                    if (supplied == null) {
>> +                        supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null");
>> +                        valueHolder.lazySet(supplied);
>> +                    }
>> +                }
>> +            }
>> +            return supplied;
>> +        };
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> index 43bd8ff..ee14ca0 100644
>> --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
>> @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport {
>>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class));
>>          assertEquals("123", exchange.getIn().getHeader("bar", String.class));
>>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
>> +        assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
>> +        assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
>>
>>          assertEquals(123, exchange.getIn().getHeader("bar", 234));
>>          assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class));
>>          assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class));
>> +        assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class));
>> +        assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class));
>>
>>          assertEquals(234, exchange.getIn().getHeader("cheese", 234));
>>          assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class));
>> +        assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class));
>>      }
>>
>>      public void testProperty() throws Exception {
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> new file mode 100644
>> index 0000000..4f68bc0
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
>> @@ -0,0 +1,58 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.util.ExchangeHelper;
>> +
>> +public class DynamicRouter4Test extends ContextTestSupport {
>> +    public void testDynamicRouter() throws Exception {
>> +        getMockEndpoint("mock:a").expectedMessageCount(1);
>> +        getMockEndpoint("mock:b").expectedMessageCount(1);
>> +        getMockEndpoint("mock:c").expectedMessageCount(1);
>> +
>> +        template.sendBody("direct:start-1", "Hello World");
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() throws Exception {
>> +                from("direct:start-1")
>> +                    .dynamicRouter()
>> +                        .exchange(DynamicRouter4Test::slip);
>> +            }
>> +        };
>> +    }
>> +
>> +    public static String slip(Exchange exchange) {
>> +        String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class);
>> +        if (previous == null) {
>> +            return "mock:a,mock:b";
>> +        } else if ("mock://b".equals(previous)) {
>> +            return "mock:c";
>> +        }
>> +
>> +        // no more so return null
>> +        return null;
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> new file mode 100644
>> index 0000000..9afd3f9
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
>> @@ -0,0 +1,53 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
>> +
>> +public class IdempotentConsumerDslTest extends ContextTestSupport {
>> +
>> +    public void testDuplicateMessages() throws Exception {
>> +        MockEndpoint mock = getMockEndpoint("mock:result");
>> +        mock.expectedBodiesReceived("one", "two", "three");
>> +
>> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
>> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> +        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
>> +        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
>> +        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
>> +
>> +        mock.assertIsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() {
>> +                from("direct:start")
>> +                    .idempotentConsumer()
>> +                        .message(m -> m.getHeader("messageId"))
>> +                        .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
>> +                    .to("mock:result");
>> +            }
>> +        };
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> index 2ef927a..54b5f6a 100644
>> --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
>> @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
>>
>>  public class LoopDoWhileTest extends ContextTestSupport {
>>
>> -    public void testLoopDoWhile() throws Exception {
>> +    public void testLoopDoWhileSimple() throws Exception {
>>          getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>>          getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>>
>> -        template.sendBody("direct:start", "A");
>> +        template.sendBody("direct:simple", "A");
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    public void testLoopDoWhileFunctional() throws Exception {
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
>> +        getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA");
>> +
>> +        template.sendBody("direct:functional", "A");
>>
>>          assertMockEndpointsSatisfied();
>>      }
>> @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
>>          return new RouteBuilder() {
>>              @Override
>>              public void configure() throws Exception {
>> -                from("direct:start")
>> +                from("direct:simple")
>>                      .loopDoWhile(simple("${body.length} <= 5"))
>>                          .to("mock:loop")
>>                          .transform(body().append("A"))
>>                      .end()
>>                      .to("mock:result");
>> +                from("direct:functional")
>> +                    .loopDoWhile()
>> +                        .body(String.class, b -> b.length() <= 5)
>> +                        .to("mock:loop")
>> +                        .transform()
>> +                            .body(String.class, b -> b += "A")
>> +                    .end()
>> +                    .to("mock:result");
>>              }
>>          };
>>      }
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> new file mode 100644
>> index 0000000..a800e36
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
>> @@ -0,0 +1,69 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class MulticastDslTest extends ContextTestSupport {
>> +    public void testMulticastDsl() throws Exception {
>> +        MockEndpoint mock = getMockEndpoint("mock:result");
>> +        mock.expectedMessageCount(1);
>> +        mock.expectedHeaderReceived("onPrepare", true);
>> +        mock.expectedBodiesReceived(5);
>> +
>> +        template.sendBody("direct:start", 1);
>> +
>> +        mock.assertIsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() throws Exception {
>> +                from("direct:start")
>> +                    .multicast()
>> +                        .onPrepare()
>> +                            .message(m -> m.setHeader("onPrepare", true))
>> +                        .aggregationStrategy()
>> +                            .body(Integer.class, (o, n) -> o != null ? o + n : n)
>> +                        .to("direct:increase-by-1")
>> +                        .to("direct:increase-by-2")
>> +                        .end()
>> +                    .to("mock:result");
>> +
>> +                from("direct:increase-by-1")
>> +                    .bean(new Increase(1));
>> +                from("direct:increase-by-2")
>> +                    .bean(new Increase(2));
>> +            }
>> +        };
>> +    }
>> +
>> +    public static class Increase {
>> +        private final int amount;
>> +        public Increase(int amount) {
>> +            this.amount = amount;
>> +        }
>> +
>> +        public int add(int num) {
>> +            return num + amount;
>> +        }
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> new file mode 100644
>> index 0000000..9d296b8
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
>> @@ -0,0 +1,49 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class RoutingSlipDslTest extends ContextTestSupport {
>> +
>> +    public void testRoutingSlipDsl() throws Exception {
>> +        MockEndpoint x = getMockEndpoint("mock:x");
>> +        MockEndpoint y = getMockEndpoint("mock:y");
>> +        MockEndpoint z = getMockEndpoint("mock:z");
>> +
>> +        x.expectedBodiesReceived("foo", "bar");
>> +        y.expectedBodiesReceived("foo", "bar");
>> +        z.expectedBodiesReceived("foo", "bar");
>> +
>> +        template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z");
>> +        template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z");
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    protected RouteBuilder createRouteBuilder() {
>> +        return new RouteBuilder() {
>> +            public void configure() {
>> +                from("direct:a").routingSlip()
>> +                    .message(m -> m.getHeader("recipientListHeader", String.class).split(","))
>> +                    .end();
>> +            }
>> +        };
>> +    }
>> +}
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> new file mode 100644
>> index 0000000..a971332
>> --- /dev/null
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
>> @@ -0,0 +1,72 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor;
>> +
>> +import java.util.concurrent.ExecutorService;
>> +import java.util.concurrent.Executors;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.component.mock.MockEndpoint;
>> +
>> +public class ThrottlerDslTest extends ContextTestSupport {
>> +    private static final int INTERVAL = 500;
>> +    protected int messageCount = 9;
>> +
>> +    protected boolean canTest() {
>> +        // skip test on windows as it does not run well there
>> +        return !isPlatform("windows");
>> +    }
>> +
>> +    public void testDsl() throws Exception {
>> +        if (!canTest()) {
>> +            return;
>> +        }
>> +
>> +        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
>> +        resultEndpoint.expectedMessageCount(messageCount);
>> +
>> +        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
>> +
>> +        long start = System.currentTimeMillis();
>> +        for (int i = 0; i < messageCount; i++) {
>> +            executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1));
>> +        }
>> +
>> +        // let's wait for the exchanges to arrive
>> +        resultEndpoint.assertIsSatisfied();
>> +
>> +        // now assert that they have actually been throttled
>> +        long minimumTime = (messageCount - 1) * INTERVAL;
>> +        // add a little slack
>> +        long delta = System.currentTimeMillis() - start + 200;
>> +        assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime);
>> +        executor.shutdownNow();
>> +    }
>> +
>> +    protected RouteBuilder createRouteBuilder() {
>> +        return new RouteBuilder() {
>> +            public void configure() {
>> +                from("direct:start")
>> +                    .throttle()
>> +                        .message(m -> m.getHeader("ThrottleCount", Integer.class))
>> +                        .timePeriodMillis(INTERVAL)
>> +                    .to("log:result", "mock:result");
>> +            }
>> +        };
>> +    }
>> +}
>> \ No newline at end of file
>>
>> http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> ----------------------------------------------------------------------
>> diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> index 55fd14e..f8d1db4 100644
>> --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
>> @@ -21,20 +21,21 @@ import java.util.stream.Collectors;
>>  import java.util.stream.Stream;
>>
>>  import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>>  import org.apache.camel.builder.RouteBuilder;
>> -import org.apache.camel.component.mock.MockEndpoint;
>>
>>  public class AggregateDslTest extends ContextTestSupport {
>>
>>      public void testAggregate() throws Exception {
>> -        MockEndpoint mock = getMockEndpoint("mock:aggregated");
>> -        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>> +        getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5");
>> +        getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
>>
>>          for (int i = 0; i < 9; i++) {
>>              template.sendBodyAndHeader("direct:start", i, "type", i % 3);
>> +            template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3);
>>          }
>>
>> -        mock.assertIsSatisfied();
>> +        assertMockEndpointsSatisfied();
>>      }
>>
>>      @Override
>> @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
>>                      .aggregate()
>>                          .message(m -> m.getHeader("type"))
>>                          .strategy()
>> -                            .body(String.class, (o, n) ->  Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")))
>> +                            .body(String.class, AggregateDslTest::joinString)
>>                          .completion()
>> -                            .body(String.class, s -> s.length() == 5)
>> -                                    .to("mock:aggregated");
>> +                            .body(String.class, s -> s.split(",").length == 2)
>> +                    .to("mock:aggregated");
>> +
>> +                from("direct:start-supplier")
>> +                    .aggregate()
>> +                        .header("type")
>> +                        .strategy(AggregateDslTest::joinStringStrategy)
>> +                        .completion()
>> +                            .body(String.class, s -> s.split(",").length == 3)
>> +                    .to("mock:aggregated-supplier");
>>              }
>>          };
>>      }
>> +
>> +    // *************************************************************************
>> +    // Strategies
>> +    // *************************************************************************
>> +
>> +    private static String joinString(String o, String n) {
>> +        return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","));
>> +    }
>> +
>> +    private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) {
>> +        newExchange.getIn().setBody(
>> +            joinString(
>> +                oldExchange != null ? oldExchange.getIn().getBody(String.class) : null,
>> +                newExchange.getIn().getBody(String.class))
>> +        );
>> +
>> +        return newExchange;
>> +    }
>>  }
>>
>>
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Loading...