[camel] branch camel-2.24.x updated: CAMEL-14162: camel-stream - When using http url then data is not sent over the wire

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch camel-2.24.x updated: CAMEL-14162: camel-stream - When using http url then data is not sent over the wire

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.24.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.24.x by this push:
     new 309084b  CAMEL-14162: camel-stream - When using http url then data is not sent over the wire
309084b is described below

commit 309084bb96f47bb67c381a5a01b0f56f6e003f67
Author: Claus Ibsen <[hidden email]>
AuthorDate: Fri Nov 8 16:06:02 2019 +0100

    CAMEL-14162: camel-stream - When using http url then data is not sent over the wire
---
 components/camel-stream/pom.xml                    |  5 ++
 .../camel/component/stream/StreamProducer.java     | 25 +++++++---
 .../component/stream/StreamToUrlJettyTest.java     | 53 ++++++++++++++++++++++
 3 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/components/camel-stream/pom.xml b/components/camel-stream/pom.xml
index 750e81a..7865371 100644
--- a/components/camel-stream/pom.xml
+++ b/components/camel-stream/pom.xml
@@ -70,6 +70,11 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-jetty</artifactId>
+            <scope>test</scope>
+        </dependency>
 
     </dependencies>
 
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
index 72826c8..5feec2e 100644
--- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamProducer.java
@@ -20,6 +20,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
@@ -50,6 +51,7 @@ public class StreamProducer extends DefaultProducer {
     private StreamEndpoint endpoint;
     private String uri;
     private OutputStream outputStream;
+    private URLConnection urlConnection;
     private AtomicInteger count = new AtomicInteger();
 
     public StreamProducer(StreamEndpoint endpoint, String uri) throws Exception {
@@ -88,18 +90,18 @@ public class StreamProducer extends DefaultProducer {
         LOG.debug("About to write to url: {}", u);
 
         URL url = new URL(u);
-        URLConnection c = url.openConnection();
-        c.setDoOutput(true);
+        urlConnection = url.openConnection();
+        urlConnection.setDoOutput(true);
         if (endpoint.getConnectTimeout() > 0) {
-            c.setConnectTimeout(endpoint.getConnectTimeout());
+            urlConnection.setConnectTimeout(endpoint.getConnectTimeout());
         }
         if (endpoint.getReadTimeout() > 0) {
-            c.setReadTimeout(endpoint.getReadTimeout());
+            urlConnection.setReadTimeout(endpoint.getReadTimeout());
         }
         if (endpoint.getHttpHeaders() != null) {
-            endpoint.getHttpHeaders().forEach((k, v) -> c.addRequestProperty(k, v.toString()));
+            endpoint.getHttpHeaders().forEach((k, v) -> urlConnection.addRequestProperty(k, v.toString()));
         }
-        return c.getOutputStream();
+        return urlConnection.getOutputStream();
     }
 
     private OutputStream resolveStreamFromFile() throws IOException {
@@ -205,8 +207,19 @@ public class StreamProducer extends DefaultProducer {
 
         // never ever close a system stream
         if (!systemStream && expiredStream) {
+            if (urlConnection != null) {
+                // force a flush as it may first send data over the wire when we are done
+                try {
+                    InputStream is = urlConnection.getInputStream();
+                    IOHelper.close(is);
+                } catch (Throwable e) {
+                    // ignore
+                }
+            }
+
             outputStream.close();
             outputStream = null;
+            urlConnection = null;
             LOG.debug("Closed stream '{}'", endpoint.getEndpointKey());
         }
     }
diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java
new file mode 100644
index 0000000..586ee35
--- /dev/null
+++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamToUrlJettyTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * Unit test for producer writing to URL.
+ */
+public class StreamToUrlJettyTest extends CamelTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                        // just send one message at a time
+                        .to("stream:url?autoCloseCount=1&url=http://localhost:8080/foo&httpHeaders.content-type=text/plain");
+
+                from("jetty:http://localhost:8080/foo")
+                    .log("Jetty foo")
+                    .to("mock:foo");
+            }
+        };
+    }
+
+    @Test
+    public void shouldSendToUrlOutputStream() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello" + System.lineSeparator(), "World" + System.lineSeparator());
+
+        template.sendBody("direct:start", "Hello");
+        template.sendBody("direct:start", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}