svn commit: r711206 - /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r711206 - /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java

ningjiang
Author: ningjiang
Date: Tue Nov  4 00:09:44 2008
New Revision: 711206

URL: http://svn.apache.org/viewvc?rev=711206&view=rev
Log:
CAMEL-1051 Get HandleFaultProcessor to work with AsyncProcessor

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java?rev=711206&r1=711205&r2=711206&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java Tue Nov  4 00:09:44 2008
@@ -16,15 +16,60 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.util.AsyncProcessorHelper;
 
-public class HandleFaultProcessor extends DelegateProcessor {
+public class HandleFaultProcessor extends DelegateProcessor implements AsyncProcessor {
+    
+    @Override
+    public String toString() {
+        return "HandleFaultProcessor(" + processor + ")";
+    }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        super.process(exchange);
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+        if (processor == null) {
+            // no processor so nothing to process, so return
+            callback.done(true);
+            return true;
+        }      
+
+        if (processor instanceof AsyncProcessor) {
+            return ((AsyncProcessor)processor).process(exchange, new AsyncCallback() {
+                
+                public void done(boolean doneSynchronously) {
+                    callback.done(doneSynchronously);
+                    Message faultMessage = exchange.getFault(false);
+                    if (faultMessage != null) {
+                        final Object faultBody = faultMessage.getBody();
+                        if (faultBody != null) {
+                            faultMessage.setBody(null); // Reset it since we are handling it.
+                            if (faultBody instanceof Throwable) {
+                                exchange.setException((Throwable)faultBody);
+                            } else {
+                                exchange.setException(new CamelException("Message contains fault of type "
+                                    + faultBody.getClass().getName() + ":\n" + faultBody));
+                            }
+                        }
+                    }                    
+                }                
+            });
+        }
+        
+        try {
+            processor.process(exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+        
         final Message faultMessage = exchange.getFault(false);
         if (faultMessage != null) {
             final Object faultBody = faultMessage.getBody();
@@ -38,5 +83,7 @@
                 }
             }
         }
+        callback.done(true);
+        return true;
     }
 }