svn commit: r614075 - in /activemq/camel/trunk/components/camel-http/src: main/java/org/apache/camel/component/http/ test/java/org/apache/camel/component/http/

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r614075 - in /activemq/camel/trunk/components/camel-http/src: main/java/org/apache/camel/component/http/ test/java/org/apache/camel/component/http/

romkal-2
Author: romkal
Date: Mon Jan 21 16:46:57 2008
New Revision: 614075

URL: http://svn.apache.org/viewvc?rev=614075&view=rev
Log:
CAMEL-304 : multithreaded http endpoint invocation

Added:
    activemq/camel/trunk/components/camel-http/src/test/java/org/apache/camel/component/http/MultiThreadedHttpGetTest.java
Modified:
    activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
    activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java

Modified: activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java?rev=614075&r1=614074&r2=614075&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java (original)
+++ activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpComponent.java Mon Jan 21 16:46:57 2008
@@ -19,12 +19,11 @@
 import java.net.URI;
 import java.util.Map;
 
-import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.util.IntrospectionSupport;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.util.IntrospectionSupport;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.commons.httpclient.params.HttpClientParams;
 
 /**
@@ -37,6 +36,9 @@
 
     private HttpClientConfigurer httpClientConfigurer;
 
+    private HttpConnectionManager httpConnectionManager =
+     new MultiThreadedHttpConnectionManager();
+    
     /**
      * Connects the URL specified on the endpoint to the specified processor.
      *
@@ -62,11 +64,20 @@
         this.httpClientConfigurer = httpClientConfigurer;
     }
 
+    public HttpConnectionManager getHttpConnectionManager() {
+ return httpConnectionManager;
+ }
+    
+    public void setHttpConnectionManager(
+ HttpConnectionManager httpConnectionManager) {
+ this.httpConnectionManager = httpConnectionManager;
+ }
+    
     @Override
     protected Endpoint<HttpExchange> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
         HttpClientParams params = new HttpClientParams();
         IntrospectionSupport.setProperties(params, parameters, "httpClient.");
-        return new HttpEndpoint(uri, this, new URI(uri), params, httpClientConfigurer);
+        return new HttpEndpoint(uri, this, new URI(uri), params, httpConnectionManager, httpClientConfigurer);
     }
 
     @Override

Modified: activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java?rev=614075&r1=614074&r2=614075&view=diff
==============================================================================
--- activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-http/src/main/java/org/apache/camel/component/http/HttpEndpoint.java Mon Jan 21 16:46:57 2008
@@ -21,6 +21,7 @@
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.DefaultPollingEndpoint;
 import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.httpclient.params.HttpClientParams;
 
 import javax.servlet.http.HttpServletRequest;
@@ -41,17 +42,19 @@
     private URI httpUri;
     private HttpClientParams clientParams;
     private HttpClientConfigurer httpClientConfigurer;
+    private HttpConnectionManager httpConnectionManager;
 
-    public HttpEndpoint(String endPointURI, HttpComponent component, URI httpURI) throws URISyntaxException {
-        this(endPointURI, component, httpURI, new HttpClientParams(), null);
+    public HttpEndpoint(String endPointURI, HttpComponent component, URI httpURI, HttpConnectionManager httpConnectionManager) throws URISyntaxException {
+        this(endPointURI, component, httpURI, new HttpClientParams(), httpConnectionManager, null);
     }
 
-    public HttpEndpoint(String endPointURI, HttpComponent component, URI httpURI, HttpClientParams clientParams, HttpClientConfigurer clientConfigurer) throws URISyntaxException {
+    public HttpEndpoint(String endPointURI, HttpComponent component, URI httpURI, HttpClientParams clientParams, HttpConnectionManager httpConnectionManager, HttpClientConfigurer clientConfigurer) throws URISyntaxException {
         super(endPointURI, component);
         this.component = component;
         this.httpUri = httpURI;
         this.clientParams = clientParams;
         this.httpClientConfigurer = clientConfigurer;
+        this.httpConnectionManager = httpConnectionManager;
     }
 
     public Producer<HttpExchange> createProducer() throws Exception {
@@ -76,6 +79,7 @@
      */
     public HttpClient createHttpClient() {
         HttpClient answer = new HttpClient(getClientParams());
+        answer.setHttpConnectionManager(httpConnectionManager);
         HttpClientConfigurer configurer = getHttpClientConfigurer();
         if (configurer != null) {
             configurer.configureHttpClient(answer);

Added: activemq/camel/trunk/components/camel-http/src/test/java/org/apache/camel/component/http/MultiThreadedHttpGetTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-http/src/test/java/org/apache/camel/component/http/MultiThreadedHttpGetTest.java?rev=614075&view=auto
==============================================================================
--- activemq/camel/trunk/components/camel-http/src/test/java/org/apache/camel/component/http/MultiThreadedHttpGetTest.java (added)
+++ activemq/camel/trunk/components/camel-http/src/test/java/org/apache/camel/component/http/MultiThreadedHttpGetTest.java Mon Jan 21 16:46:57 2008
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.http;
+
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision: 520220 $
+ */
+public class MultiThreadedHttpGetTest extends ContextTestSupport {
+
+    public void testHttpGetWithConversion() throws Exception {
+    
+     // In this scenario response stream is converted to String
+     // so the stream has to be read to the end. When this happens
+     // the associated connection is released automatically.
+    
+     String endpointName = "seda:withConversion";
+        sendMessagesTo(endpointName, 5);
+    }
+    
+    public void testHttpGetWithoutConversion() throws Exception {
+    
+     // This is needed as by default there are 2 parallel
+     // connections to some host and there is nothing that
+     // closes the http connection here.
+    
+     context.getComponent("http", HttpComponent.class)
+     .getHttpConnectionManager().getParams()
+     .setDefaultMaxConnectionsPerHost(5);
+    
+     String endpointName = "seda:withoutConversion";
+        sendMessagesTo(endpointName, 5);
+    }
+    
+    public void testHttpGetWithExplicitStreamClose() throws Exception {
+    
+     // We close connections explicitely at the very end of the flow
+     // (camel doesn't know when the stream is not needed any more)
+    
+     MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results", MockEndpoint.class);
+    
+ for (int i = 0; i < 5; i++) {
+ mockEndpoint.expectedMessageCount(1);
+ template.sendBody("seda:withoutConversion", null);
+ mockEndpoint.assertIsSatisfied();
+ Object response = mockEndpoint.getReceivedExchanges().get(0)
+ .getIn().getBody();
+ InputStream responseStream = assertIsInstanceOf(InputStream.class,
+ response);
+ responseStream.close();
+ mockEndpoint.reset();
+ }    
+    }
+
+ protected void sendMessagesTo(String endpointName, int count) throws InterruptedException {
+ MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results", MockEndpoint.class);
+        mockEndpoint.expectedMessageCount(count);
+
+        for (int i = 0; i < count; i++) {
+ template.sendBody(endpointName, null);
+        }
+        
+        mockEndpoint.assertIsSatisfied();
+        List<Exchange> list = mockEndpoint.getReceivedExchanges();
+        for (Exchange exchange : list) {
+         String body = exchange.getIn().getBody(String.class);
+
+            log.debug("Body: " + body);
+            assertNotNull("Should have a body!", body);
+            assertTrue("body should contain: <html", body.contains("<html"));
+        }
+ }
+
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("seda:withConversion").thread(5)
+                 .to("http://www.google.com/search")
+                 .convertBodyTo(String.class).to("mock:results");
+                
+                from("seda:withoutConversion").thread(5)
+                 .to("http://www.google.com/search")
+             .to("mock:results");
+            }
+        };
+    }
+}