svn commit: r563931 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/Pipeline.java test/java/org/apache/camel/component/file/FileExchangeTest.java

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r563931 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/Pipeline.java test/java/org/apache/camel/component/file/FileExchangeTest.java

jstrachan-2
Author: jstrachan
Date: Wed Aug  8 09:30:18 2007
New Revision: 563931

URL: http://svn.apache.org/viewvc?view=rev&rev=563931
Log:
added patch suggested by James Zhang here: http://www.nabble.com/Re%3A-Problem-with-ETL-example-p12055635s22882.html

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExchangeTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=diff&rev=563931&r1=563930&r2=563931
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed Aug  8 09:30:18 2007
@@ -21,6 +21,9 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.Message;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Creates a Pipeline pattern where the output of the previous step is sent as
@@ -29,7 +32,7 @@
  * @version $Revision$
  */
 public class Pipeline extends MulticastProcessor implements Processor {
-
+    private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 
     public Pipeline(Collection<Processor> processors) {
         super(processors);
@@ -70,8 +73,17 @@
         // now lets set the input of the next exchange to the output of the
         // previous message if it is not null
         Object output = previousExchange.getOut().getBody();
+        Message in = answer.getIn();
         if (output != null) {
-            answer.getIn().setBody(output);
+            in.setBody(output);
+        }
+        else {
+            Object previousInBody = previousExchange.getIn().getBody();
+            if (in.getBody() == null && previousInBody != null) {
+                LOG.warn("Bad exchange implementation; the copy() method did not copy across the in body: " + previousExchange
+                        + " of type: " + previousExchange.getClass());
+                in.setBody(previousInBody);
+            }
         }
         return answer;
     }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExchangeTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExchangeTest.java?view=diff&rev=563931&r1=563930&r2=563931
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExchangeTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExchangeTest.java Wed Aug  8 09:30:18 2007
@@ -19,20 +19,44 @@
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.Pipeline;
 
 import java.io.File;
+import java.util.Collections;
 
 /**
  * @version $Revision: 1.1 $
  */
 public class FileExchangeTest extends ContextTestSupport {
-    
-    public void testCopy() {
-        File file = new File(FileExchangeTest.class.getResource("FileExchangeTest.class").getFile());
+    protected File file;
 
+    public void testCopy() {
         FileExchange fileExchange = new FileExchange(context, file);
         Exchange exchange = fileExchange.copy();
         FileExchange copy = assertIsInstanceOf(FileExchange.class, exchange);
         assertEquals("File", file, copy.getFile());
+        Object body = copy.getIn().getBody();
+        assertNotNull("Should have a body!", body);
+    }
+
+    public void testPipelineCopy() throws Exception {
+        Processor myProcessor = new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                Object body = exchange.getIn().getBody();
+                assertNotNull("Should have a body!", body);
+            }
+        };
+
+        Pipeline pipeline = new Pipeline(Collections.singletonList(myProcessor));
+        FileExchange exchange = new FileExchange(context, file);
+        pipeline.process(exchange.copy());            
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        file = new File(FileExchangeTest.class.getResource("FileExchangeTest.class").getFile());
     }
 }