svn commit: r756685 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/mock/ camel-core/src/main/java/org/apache/camel/impl/ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/test/java/org/a...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r756685 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/component/mock/ camel-core/src/main/java/org/apache/camel/impl/ components/camel-jms/src/main/java/org/apache/camel/component/jms/ components/camel-jms/src/test/java/org/a...

davsclaus-2
Author: davsclaus
Date: Fri Mar 20 17:47:51 2009
New Revision: 756685

URL: http://svn.apache.org/viewvc?rev=756685&view=rev
Log:
CAMEL-870: Added transferExchange option to camel-jms.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java   (with props)
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java   (with props)
Removed:
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHolder.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
    camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=756685&r1=756684&r2=756685&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Fri Mar 20 17:47:51 2009
@@ -76,6 +76,9 @@
     private String headerName;
     private Object headerValue;
     private Object actualHeader;
+    private String propertyName;
+    private Object propertyValue;
+    private Object actualProperty;
     private Processor reporter;
 
     public MockEndpoint(String endpointUri, Component component) {
@@ -323,6 +326,24 @@
     }
 
     /**
+     * Adds an expectation that the given property name & value are received by this
+     * endpoint
+     */
+    public void expectedPropertyReceived(final String name, final Object value) {
+        this.propertyName = name;
+        this.propertyValue = value;
+
+        expects(new Runnable() {
+            public void run() {
+                assertTrue("No property with name " + propertyName + " found.", actualProperty != null);
+
+                Object actualValue = getCamelContext().getTypeConverter().convertTo(actualProperty.getClass(), propertyValue);
+                assertEquals("Property of message", actualValue, actualProperty);
+            }
+        });
+    }
+
+    /**
      * Adds an expectation that the given body values are received by this
      * endpoint in the specified order
      */
@@ -725,6 +746,10 @@
             actualHeader = in.getHeader(headerName);
         }
 
+        if (propertyName != null) {
+            actualProperty = exchange.getProperty(propertyName);
+        }
+
         if (expectedBodyValues != null) {
             int index = actualBodyValues.size();
             if (expectedBodyValues.size() > index) {

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java?rev=756685&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java Fri Mar 20 17:47:51 2009
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.camel.Exchange;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Holder object for sending an exchange over a remote wire as a serialized object.
+ * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint.
+ * <p/>
+ * As opposed to normal usage where only the body part of the exchange is transfered over the wire,
+ * this holder object serializes the following fields over the wire:
+ * <ul>
+ * <li>in body</li>
+ * <li>out body</li>
+ * <li>in headers</li>
+ * <li>out headers</li>
+ * <li>fault body </li>
+ * <li>fault headers</li>
+ * <li>exchange properties</li>
+ * <li>exception</li>
+ * </ul>
+ * Any object that is not serializable will be skipped and Camel will log this at WARN level.
+ *
+ * @version $Revision$
+ */
+public class DefaultExchangeHolder implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final transient Log LOG = LogFactory.getLog(DefaultExchangeHolder.class);
+
+    private Object inBody;
+    private Object outBody;
+    private Object faultBody;
+    private Map<String, Object> inHeaders = new LinkedHashMap<String, Object>();
+    private Map<String, Object> outHeaders = new LinkedHashMap<String, Object>();
+    private Map<String, Object> properties = new LinkedHashMap<String, Object>();
+    private Map<String, Object> faultHeaders = new LinkedHashMap<String, Object>();
+    private Exception exception;
+
+    /**
+     * Creates a payload object with the information from the given exchange.
+     * Only marshal the Serializable object
+     *
+     * @param exchange the exchange
+     * @return the holder object with information copied form the exchange
+     */
+    public static DefaultExchangeHolder marshal(Exchange exchange) {
+        DefaultExchangeHolder payload = new DefaultExchangeHolder();
+
+        payload.inBody = checkSerializableObject("in body", exchange.getIn().getBody());
+        payload.inHeaders.putAll(checkMapSerializableObjects("in headers", exchange.getIn().getHeaders()));
+        if (exchange.getOut(false) != null) {
+            payload.outBody = checkSerializableObject("out body", exchange.getOut().getBody());
+            payload.outHeaders.putAll(checkMapSerializableObjects("out headers", exchange.getOut().getHeaders()));
+        }
+        if (exchange.getFault(false) != null) {
+            payload.faultBody = checkSerializableObject("fault body", exchange.getFault().getBody());
+            payload.faultHeaders.putAll(checkMapSerializableObjects("fault headers", exchange.getFault().getHeaders()));
+        }
+        payload.properties.putAll(checkMapSerializableObjects("exchange properties", exchange.getProperties()));
+        payload.exception = exchange.getException();
+
+        return payload;
+    }
+
+    /**
+     * Transfers the information from the payload to the exchange.
+     *
+     * @param exchange the exchange to set values from the payload
+     * @param payload  the payload with the values
+     */
+    public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) {
+        exchange.getIn().setBody(payload.inBody);
+        exchange.getIn().setHeaders(payload.inHeaders);
+        if (payload.outBody != null) {
+            exchange.getOut().setBody(payload.outBody);
+            exchange.getOut().setHeaders(payload.outHeaders);
+        }
+        if (payload.faultBody != null) {
+            exchange.getFault().setBody(payload.faultBody);
+            exchange.getFault().setHeaders(payload.faultHeaders);
+        }
+        for (String key : payload.properties.keySet()) {
+            exchange.setProperty(key, payload.properties.get(key));
+        }
+        exchange.setException(payload.exception);
+    }
+
+    public String toString() {
+        StringBuilder sb = new StringBuilder("DefaultExchangeHolder[");
+        sb.append("inBody=").append(inBody).append(", outBody=").append(outBody);
+        sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders);
+        sb.append(", faultBody=").append(faultBody).append(", faultHeaders=").append(faultHeaders);
+        sb.append(", properties=").append(properties).append(", exception=").append(exception);
+        return sb.append(']').toString();
+    }
+
+    private static Object checkSerializableObject(String type, Object object) {
+        if (object instanceof Serializable) {
+            return object;
+        } else {
+            LOG.warn(type + " containig object " + object + " cannot be serialized, it will be excluded by the holder");
+            return null;
+        }
+    }
+
+    private static Map<String, Object> checkMapSerializableObjects(String type, Map<String, Object> map) {
+        if (map == null) {
+            return null;
+        }
+
+        Map<String, Object> result = new LinkedHashMap<String, Object>();
+        for (Map.Entry<String, Object> entry : map.entrySet()) {
+            if (entry.getValue() instanceof Serializable) {
+                result.put(entry.getKey(), entry.getValue());
+            } else {
+                LOG.warn(type + " containing object " + entry.getValue() + " of key " + entry.getKey()
+                        + " cannot be serialized, it will be excluded by the holder");
+            }
+        }
+
+        return result;
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeHolder.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=756685&r1=756684&r2=756685&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java Fri Mar 20 17:47:51 2009
@@ -43,6 +43,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -88,7 +89,14 @@
         try {
             if (message instanceof ObjectMessage) {
                 ObjectMessage objectMessage = (ObjectMessage)message;
-                return objectMessage.getObject();
+                Object payload = objectMessage.getObject();
+                if (payload instanceof DefaultExchangeHolder) {
+                    DefaultExchangeHolder holder = (DefaultExchangeHolder) payload;
+                    DefaultExchangeHolder.unmarshal(exchange, holder);
+                    return exchange.getIn().getBody();
+                } else {
+                    return objectMessage.getObject();
+                }
             } else if (message instanceof TextMessage) {
                 TextMessage textMessage = (TextMessage)message;
                 return textMessage.getText();
@@ -194,7 +202,7 @@
             }
         }
         if (answer == null) {
-            answer = createJmsMessage(camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext());
+            answer = createJmsMessage(exchange, camelMessage.getBody(), camelMessage.getHeaders(), session, exchange.getContext());
             appendJmsProperties(answer, exchange, camelMessage);
         }
         return answer;
@@ -288,9 +296,18 @@
         return null;
     }
 
-    protected Message createJmsMessage(Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException {
+    protected Message createJmsMessage(Exchange exchange, Object body, Map<String, Object> headers, Session session, CamelContext context) throws JMSException {
         JmsMessageType type = null;
 
+        // special for transferExchange
+        if (endpoint != null && endpoint.isTransferExchange()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Option transferExchange=true so we use JmsMessageType: Object");
+            }
+            Serializable holder = DefaultExchangeHolder.marshal(exchange);
+            return session.createObjectMessage(holder);
+        }
+
         // check if header have a type set, if so we force to use it
         if (headers.containsKey(JmsConstants.JMS_MESSAGE_TYPE)) {
             type = context.getTypeConverter().convertTo(JmsMessageType.class, headers.get(JmsConstants.JMS_MESSAGE_TYPE));

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=756685&r1=756684&r2=756685&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Fri Mar 20 17:47:51 2009
@@ -133,6 +133,7 @@
     private String replyToDestinationSelectorName;
     private JmsMessageType jmsMessageType;
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
+    private boolean transferExchange;
 
     public JmsConfiguration() {
     }
@@ -1165,4 +1166,12 @@
     public void setJmsKeyFormatStrategy(JmsKeyFormatStrategy jmsKeyFormatStrategy) {
         this.jmsKeyFormatStrategy = jmsKeyFormatStrategy;
     }
+
+    public boolean isTransferExchange() {
+        return transferExchange;
+    }
+
+    public void setTransferExchange(boolean transferExchange) {
+        this.transferExchange = transferExchange;
+    }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=756685&r1=756684&r2=756685&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Fri Mar 20 17:47:51 2009
@@ -779,6 +779,14 @@
         getConfiguration().setJmsKeyFormatStrategy(jmsHeaderStrategy);
     }
 
+    public boolean isTransferExchange() {
+        return getConfiguration().isTransferExchange();
+    }
+
+    public void setTransferExchange(boolean transferExchange) {
+        getConfiguration().setTransferExchange(transferExchange);
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java?rev=756685&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java Fri Mar 20 17:47:51 2009
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * Unit test for using JMS as DLQ
+ *
+ * @version $Revision$
+ */
+public class JmsDeadLetterQueueTest extends ContextTestSupport {
+
+    protected String getUri() {
+        return "activemq:queue:dead";
+    }
+
+    public void testOk() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testKabom() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:dead");
+        mock.expectedBodiesReceived("Kabom");
+
+        try {
+            template.sendBody("direct:start", "Kabom");
+            fail("Should have thrown a RuntimeCamelException");
+        } catch (RuntimeCamelException e) {
+            assertEquals("Kabom", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // the cause exception is gone in the transformation below
+        assertNull(mock.getReceivedExchanges().get(0).getProperty(Exchange.EXCEPTION_CAUGHT));
+
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("seda:dead").disableRedelivery());
+
+                from("direct:start").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        if ("Kabom".equals(body)) {
+                            throw new IllegalArgumentException("Kabom");
+                        }
+                    }
+                }).to("mock:result");
+
+                from("seda:dead").transform(exceptionMessage()).to(getUri());
+
+                from(getUri()).to("mock:dead");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java?rev=756685&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java Fri Mar 20 17:47:51 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * Unit test for using JMS as DLQ and to preserve the Exchange using transferExchange=true option
+ *
+ * @version $Revision$
+ */
+public class JmsDeadLetterQueueUsingTransferExchangeTest extends ContextTestSupport {
+
+    protected String getUri() {
+        return "activemq:queue:dead?transferExchange=true";
+    }
+
+    public void testOk() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testKabom() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:dead");
+        mock.expectedBodiesReceived("Kabom");
+
+        try {
+            template.sendBody("direct:start", "Kabom");
+            fail("Should have thrown a RuntimeCamelException");
+        } catch (RuntimeCamelException e) {
+            assertEquals("Kabom", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        Exchange dead = mock.getReceivedExchanges().get(0);
+        // caused exception is stored as a property
+        assertEquals("Kabom", dead.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class).getMessage());
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel(getUri()).disableRedelivery());
+
+                from("direct:start").process(new Processor() {
+                    public void process(Exchange exchange) throws Exception {
+                        String body = exchange.getIn().getBody(String.class);
+                        if ("Kabom".equals(body)) {
+                            throw new IllegalArgumentException("Kabom");
+                        }
+                    }
+                }).to("mock:result");
+
+                from(getUri()).to("mock:dead");
+            }
+        };
+    }
+
+}
\ No newline at end of file

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsDeadLetterQueueUsingTransferExchangeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java?rev=756685&view=auto
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java (added)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java Fri Mar 20 17:47:51 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms;
+
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+
+/**
+ * @version $Revision$
+ */
+public class JmsTransferExchangeTest extends ContextTestSupport {
+
+    protected String getUri() {
+        return "activemq:queue:foo?transferExchange=true";
+    }
+
+    public void testBodyOnly() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testBodyAndHeaderOnly() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("foo", "cheese");
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "foo", "cheese");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSendExchange() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedHeaderReceived("foo", "cheese");
+        mock.expectedPropertyReceived("bar", 123);
+
+        template.send("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.getIn().setHeader("foo", "cheese");
+                exchange.setProperty("bar", 123);
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext camelContext = super.createCamelContext();
+
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        camelContext.addComponent("activemq", jmsComponentClientAcknowledge(connectionFactory));
+
+        return camelContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to(getUri());
+                from(getUri()).to("mock:result");
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java?rev=756685&r1=756684&r2=756685&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java Fri Mar 20 17:47:51 2009
@@ -17,15 +17,15 @@
 package org.apache.camel.component.mina;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
 
 /**
  * Helper to get and set the correct payload when transfering data using camel-mina.
  * Always use this helper instead of direct access on the exchange object.
  * <p/>
  * This helper ensures that we can also transfer exchange objects over the wire using the
- * <tt>exchangePayload=true</tt> option.
+ * <tt>transferExchange=true</tt> option.
  *
- * @see org.apache.camel.component.mina.MinaPayloadHolder
  * @version $Revision$
  */
 public final class MinaPayloadHelper {
@@ -37,7 +37,7 @@
     public static Object getIn(MinaEndpoint endpoint, Exchange exchange) {
         if (endpoint.getConfiguration().isTransferExchange()) {
             // we should transfer the entire exchange over the wire (includes in/out)
-            return MinaPayloadHolder.marshal(exchange);
+            return DefaultExchangeHolder.marshal(exchange);
         } else {
             // normal transfer using the body only
             return exchange.getIn().getBody();
@@ -47,7 +47,7 @@
     public static Object getOut(MinaEndpoint endpoint, Exchange exchange) {
         if (endpoint.getConfiguration().isTransferExchange()) {
             // we should transfer the entire exchange over the wire (includes in/out)
-            return MinaPayloadHolder.marshal(exchange);
+            return DefaultExchangeHolder.marshal(exchange);
         } else {
             // normal transfer using the body only
             return exchange.getOut().getBody();
@@ -55,8 +55,8 @@
     }
 
     public static void setIn(Exchange exchange, Object payload) {
-        if (payload instanceof MinaPayloadHolder) {
-            MinaPayloadHolder.unmarshal(exchange, (MinaPayloadHolder) payload);
+        if (payload instanceof DefaultExchangeHolder) {
+            DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
         } else {
             // normal transfer using the body only
             exchange.getIn().setBody(payload);
@@ -64,8 +64,8 @@
     }
 
     public static void setOut(Exchange exchange, Object payload) {
-        if (payload instanceof MinaPayloadHolder) {
-            MinaPayloadHolder.unmarshal(exchange, (MinaPayloadHolder) payload);
+        if (payload instanceof DefaultExchangeHolder) {
+            DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
         } else {
             // normal transfer using the body only and preserve the headers
             exchange.getOut().setHeaders(exchange.getIn().getHeaders());