svn commit: r642102 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/converter/ main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r642102 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/converter/ main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor...

jstrachan-2
Author: jstrachan
Date: Fri Mar 28 00:02:33 2008
New Revision: 642102

URL: http://svn.apache.org/viewvc?rev=642102&view=rev
Log:
applied patch for https://issues.apache.org/activemq/browse/CAMEL-179 with huge thanks!

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/package.html   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java   (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/IOConverterTest.java   (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java   (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java   (with props)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingInterceptorTest.java   (with props)
    activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/
    activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/test.xml   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptorRef.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java?rev=642102&r1=642101&r2=642102&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/RouteBuilder.java Fri Mar 28 00:02:33 2008
@@ -28,10 +28,13 @@
 import org.apache.camel.model.ChoiceType;
 import org.apache.camel.model.ExceptionType;
 import org.apache.camel.model.InterceptType;
+import org.apache.camel.model.InterceptorRef;
+import org.apache.camel.model.InterceptorType;
 import org.apache.camel.model.ProcessorType;
 import org.apache.camel.model.RouteType;
 import org.apache.camel.model.RoutesType;
 import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.interceptor.StreamCachingInterceptor;
 
 /**
  * A <a href="http://activemq.apache.org/camel/dsl.html">Java DSL</a> which is
@@ -54,7 +57,6 @@
         super(context);
     }
 
-
     @Override
     public String toString() {
         return routeCollection.toString();
@@ -198,7 +200,21 @@
     public RoutesType getRouteCollection() {
         return this.routeCollection;
     }
-
+    
+    /**
+     * Completely disable stream caching for all routes being defined in the same RouteBuilder after this.
+     */
+    public void noStreamCaching() {
+        StreamCachingInterceptor.noStreamCaching(routeCollection.getInterceptors());
+    }
+    
+    /**
+     * Enable stream caching for all routes being defined in the same RouteBuilder after this call.
+     */
+    public void streamCaching() {
+        routeCollection.intercept(new StreamCachingInterceptor());
+    }
+    
     /**
      * Factory method
      */

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java?rev=642102&r1=642101&r2=642102&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/IOConverter.java Fri Mar 28 00:02:33 2008
@@ -226,4 +226,21 @@
             return new ObjectInputStream(stream);
         }
     }
+    
+    @Converter
+    public static byte[] toBytes(InputStream stream) throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        copy(stream, bos);
+        return bos.toByteArray();
+    }
+
+    protected static void copy(InputStream stream, ByteArrayOutputStream bos) throws IOException {
+        byte[] data = new byte[4096];
+        int read = stream.read(data);
+        while (read != -1) {
+            bos.write(data, 0, read);
+            read = stream.read(data);
+        }
+        bos.flush();
+    }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import org.apache.camel.processor.interceptor.StreamCachingInterceptor;
+
+/**
+ * Tagging interface to indicate that a type is capable of caching the underlying data stream.
+ * This is a useful feature for avoid message re-readability issues.  This interface is mainly used by the {@link StreamCachingInterceptor} for determining if/how to wrap a stream-based message.
+ */
+public interface StreamCache {
+
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.Converter;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.converter.jaxp.StringSource;
+import org.apache.camel.converter.jaxp.XmlConverter;
+
+/**
+ * A set of {@link Converter} methods for wrapping stream-based messages in a {@link StreamCache} implementation to ensure message re-readability (e.g. multicasting, retrying, ...)  
+ */
+@Converter
+public class StreamCacheConverter {
+    
+    private XmlConverter converter = new XmlConverter();
+
+    @Converter
+    public StreamCache convertToStreamCache(StreamSource source) throws TransformerException {
+        //TODO: we can probably build a more generic converter method to support other kinds of Sources as well (e.g. SAXSource, StAXSource, ...)
+        return new StreamSourceCache(converter.toString(source));
+    }
+    
+    @Converter
+    public StreamCache convertToStreamCache(InputStream stream) throws IOException {
+        return new InputStreamCache(IOConverter.toBytes(stream));
+    }
+
+    private class StreamSourceCache extends StringSource implements StreamCache {
+        
+        private static final long serialVersionUID = 4147248494104812945L;
+
+        public StreamSourceCache(String text) {
+            super(text);
+        }
+    }
+    
+    private class InputStreamCache extends ByteArrayInputStream implements StreamCache {
+    
+        public InputStreamCache(byte[] data) {
+            super(data);
+        }
+        
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/package.html
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/package.html?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/package.html (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/package.html Fri Mar 28 00:02:33 2008
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+  
+    http://www.apache.org/licenses/LICENSE-2.0
+  
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Package with converters for dealing with stream-based messages
+
+</body>
+</html>

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/package.html
------------------------------------------------------------------------------
    svn:mime-type = text/html

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptorRef.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptorRef.java?rev=642102&r1=642101&r2=642102&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptorRef.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptorRef.java Fri Mar 28 00:02:33 2008
@@ -88,4 +88,14 @@
             return "";
         }
     }
+    
+    /**
+     * Get the underlying {@link DelegateProcessor} implementation
+     *
+     * @return the {@link DelegateProcessor}
+     */
+    @XmlTransient
+    public DelegateProcessor getInterceptor() {
+        return interceptor;
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java?rev=642102&r1=642101&r2=642102&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RouteType.java Fri Mar 28 00:02:33 2008
@@ -34,6 +34,7 @@
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Route;
 import org.apache.camel.impl.RouteContext;
+import org.apache.camel.processor.interceptor.StreamCachingInterceptor;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -211,5 +212,21 @@
             list.addAll(getInterceptors());
         }
 */
+    }
+
+    /**
+     * Disable stream caching for this Route.
+     */
+    public RouteType noStreamCaching() {
+        StreamCachingInterceptor.noStreamCaching(interceptors);
+        return this;
+    }
+
+    /**
+     * Enable stream caching for this Route.
+     */
+    public RouteType streamCaching() {
+        intercept(new StreamCachingInterceptor());
+        return this;
     }
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,65 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.interceptor;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.converter.stream.StreamCache;
+import org.apache.camel.model.InterceptorRef;
+import org.apache.camel.model.InterceptorType;
+import org.apache.camel.processor.Interceptor;
+
+/**
+ * {@link Interceptor} that converts a message into a re-readable format
+ */
+public class StreamCachingInterceptor extends Interceptor {
+
+    public StreamCachingInterceptor() {
+        super();
+        setInterceptorLogic(new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                Object newBody = exchange.getIn().getBody(StreamCache.class);
+                if (newBody != null) {
+                    exchange.getIn().setBody(newBody);
+                }
+                proceed(exchange);
+            }
+        });
+    }
+
+    public StreamCachingInterceptor(Processor processor) {
+        this();
+        setProcessor(processor);
+    }
+    
+    /**
+     * Remove this interceptor from the given list of interceptors
+     * @param interceptors the list of interceptos
+     */
+    public static void noStreamCaching(List<InterceptorType> interceptors) {
+        for (int i = 0 ; i < interceptors.size() ; i++) {
+            InterceptorType interceptor = interceptors.get(i);
+            if (interceptor instanceof InterceptorRef &&
+                ((InterceptorRef) interceptor).getInterceptor() instanceof StreamCachingInterceptor) {
+                interceptors.remove(interceptor);
+            }
+        }
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/IOConverterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/IOConverterTest.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/IOConverterTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/IOConverterTest.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+/**
+ * Test case for {@link IOConverter}
+ */
+public class IOConverterTest extends TestCase {
+    
+    private static final byte[] TESTDATA = "My test data".getBytes();
+    
+    public void testToBytes() throws FileNotFoundException, IOException {
+        byte[] data = IOConverter.toBytes(new FileInputStream("src/test/resources/org/apache/camel/converter/dummy.txt"));
+        assertEquals(962, data.length);
+        assertEquals('#', (char) data[0]);
+        assertEquals('!', (char) data[data.length - 1]);
+    }
+    
+    public void testCopy() throws IOException {
+        ByteArrayInputStream bis = new ByteArrayInputStream(TESTDATA);
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        IOConverter.copy(bis, bos);
+        assertEquals(TESTDATA, bos.toByteArray());
+    }
+    
+    private void assertEquals(byte[] data1, byte[] data2) {
+        assertEquals(data1.length, data2.length);
+        for (int i = 0 ; i < data1.length ; i++) {
+            assertEquals(data1[i], data2[i]);
+        }
+    }
+
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/IOConverterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.stream.StreamSource;
+
+import junit.framework.TestCase;
+
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.converter.jaxp.XmlConverter;
+
+/**
+ * Test cases for {@link StreamCacheConverter}
+ */
+public class StreamCacheConverterTest extends TestCase {
+    
+    private static final String TEST_FILE = "org/apache/camel/converter/stream/test.xml";
+    private StreamCacheConverter converter;
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        this.converter = new StreamCacheConverter();
+    }
+
+    public void testConvertToStreamCacheStreamSource() throws TransformerException, FileNotFoundException {
+        StreamSource source = new StreamSource(getTestFileStream());
+        StreamSource cache = (StreamSource) converter.convertToStreamCache(source);
+        //assert re-readability of the cached StreamSource
+        XmlConverter converter = new XmlConverter();
+        assertNotNull(converter.toString(cache));
+        assertNotNull(converter.toString(cache));
+    }
+
+    public void testConvertToStreamCacheInputStream() throws IOException {
+        InputStream is = getTestFileStream();
+        InputStream cache = (InputStream)converter.convertToStreamCache(is);
+        //assert re-readability of the cached InputStream
+        assertNotNull(IOConverter.toString(cache));
+        assertNotNull(IOConverter.toString(cache));
+    }
+
+
+    protected InputStream getTestFileStream() {
+        InputStream answer = getClass().getClassLoader().getResourceAsStream(TEST_FILE);
+        assertNotNull("Should have found the file: " + TEST_FILE + " on the classpath", answer);
+        return answer;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/StreamCacheConverterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.StringReader;
+
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision: 1.1 $
+ */
+public class MulticastStreamCachingTest extends ContextTestSupport {
+    protected Endpoint<Exchange> startEndpoint;
+    protected MockEndpoint x;
+    protected MockEndpoint y;
+    protected MockEndpoint z;
+    
+    public void testSendingAMessageUsingMulticastConvertsToReReadable() throws Exception {
+        x.expectedBodiesReceived("<input/>+output");
+        y.expectedBodiesReceived("<input/>+output");
+        z.expectedBodiesReceived("<input/>+output");
+
+        template.send("direct:a", new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+ in.setBody(new StreamSource(new StringReader("<input/>")));
+                in.setHeader("foo", "bar");
+            }
+        });
+        
+        assertMockEndpointsSatisifed();
+    }
+
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        x = getMockEndpoint("mock:x");
+        y = getMockEndpoint("mock:y");
+        z = getMockEndpoint("mock:z");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        final Processor processor = new Processor() {
+            public void process(Exchange exchange) {
+                // lets transform the IN message
+                Message in = exchange.getIn();
+                String body = in.getBody(String.class);
+                in.setBody(body + "+output");
+            }
+        };
+
+        return new RouteBuilder() {
+            public void configure() {
+                //stream caching should fix re-readability issues when multicasting messags
+                from("direct:a").streamCaching().multicast().to("direct:x", "direct:y", "direct:z");
+
+                from("direct:x").process(processor).to("mock:x");
+                from("direct:y").process(processor).to("mock:y");
+                from("direct:z").process(processor).to("mock:z");
+            }
+        };
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastStreamCachingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingInterceptorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingInterceptorTest.java?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingInterceptorTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingInterceptorTest.java Fri Mar 28 00:02:33 2008
@@ -0,0 +1,81 @@
+package org.apache.camel.processor.interceptor;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.transform.stream.StreamSource;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.converter.stream.StreamCache;
+import org.apache.camel.model.InterceptorRef;
+import org.apache.camel.model.InterceptorType;
+import org.apache.camel.processor.DelegateProcessor;
+
+public class StreamCachingInterceptorTest extends ContextTestSupport {
+
+    private MockEndpoint a;
+    private MockEndpoint b;
+    
+    public void testConvertStreamSourceWithRouteBuilderStreamCaching() throws Exception {
+        a.expectedMessageCount(1);
+        
+        StreamSource message = new StreamSource(new StringReader("<hello>world!</hello>"));
+        template.sendBody("direct:a", message);
+
+        assertMockEndpointsSatisifed();
+        assertTrue(a.assertExchangeReceived(0).getIn().getBody() instanceof StreamCache);
+    }
+    
+    public void testConvertStreamSourceWithRouteOnlyStreamCaching() throws Exception {
+        b.expectedMessageCount(1);
+        
+        StreamSource message = new StreamSource(new StringReader("<hello>world!</hello>"));
+        template.sendBody("direct:b", message);
+
+        assertMockEndpointsSatisifed();
+        assertTrue(b.assertExchangeReceived(0).getIn().getBody() instanceof StreamCache);
+    }
+    
+    public void testIgnoreAlreadyRereadable() throws Exception {
+        a.expectedMessageCount(1);
+        
+        template.sendBody("direct:a", "<hello>world!</hello>");
+
+        assertMockEndpointsSatisifed();
+        assertTrue(a.assertExchangeReceived(0).getIn().getBody() instanceof String);
+    }
+    
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        a = getMockEndpoint("mock:a");
+        b = getMockEndpoint("mock:b");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+        
+            public void configure() {
+                //Stream caching for a single route...
+                from("direct:a").streamCaching().to("mock:a");
+                
+                //... or for all the following routes in this builder
+                streamCaching();
+                from("direct:b").to("mock:b");
+            }
+        };
+    }
+    
+    public void testNoStreamCaching() throws Exception {
+        List<InterceptorType> interceptors = new LinkedList<InterceptorType>();
+        InterceptorRef streamCache = new InterceptorRef(new StreamCachingInterceptor());
+        interceptors.add(streamCache);
+        interceptors.add(new InterceptorRef(new DelegateProcessor()));
+        StreamCachingInterceptor.noStreamCaching(interceptors);
+        assertEquals(1, interceptors.size());
+        assertFalse(interceptors.contains(streamCache));
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/StreamCachingInterceptorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/test.xml
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/test.xml?rev=642102&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/test.xml (added)
+++ activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/test.xml Fri Mar 28 00:02:33 2008
@@ -0,0 +1,22 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<person user="james">
+  <firstName>James</firstName>
+  <lastName>Strachan</lastName>
+  <city>London</city>
+</person>
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/resources/org/apache/camel/converter/stream/test.xml
------------------------------------------------------------------------------
    svn:eol-style = native