Hi,
Most of our components currently depend on synchronous processing of
the Exchange or bad things can happen. For example the following does
not work:
from("file:/tmp/foo").to("seda:test");
from("seda:test").process( myProcessor );
Why? because the file component delete the file as soon as the
exchange returns from being sent to seda:test. What would have been
nice is that file deletion did not occur until after the exchange is
processed by myProcessor. But that's occuring in an asynchronous
thread.
Here's an idea that might help solve this problem.
Have the seda component call something like
exchange.getExchangeFuture().done()
when the message is processed in it's async thread.
and in the file component, have it call
exchange.getExchangeFuture().get();
// then the code that deletes the file
or
exchange.getExchangeFuture().setCallback( new Callback() {
public void done( Exchange exch ) {
// then the code that deletes the file
}
})
It's just a simple stab at a possible solution. I got a feeling that
it's going to get more complicated since now we are forcing components
to be aware of the async processing model and we tend to copy
exchanges and do processing in pipelines etc. etc. But I'm hoping to
get the conversation started on this topic. What do you guys think??
is there a simpler way to solve this?
Bellow you will find a simple patch that implements what the
exchange.getExchangeFuture() method might look like.
Index: camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java (revision
0)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java (revision
0)
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.camel.Exchange;
+
+/**
+ * The DefaultExchangeFuture allows Exchanges to processed
asynchronously by allowing
+ * the producer and consumer of the exchange to signal each other so that it
+ * is known when processing has completed.
+ *
+ * For processors participating on asynchronous exchanges, once an
exchange has been
+ * processed, the DefaultExchangeFuture.done() method should be
called to let the exchange
+ * creator know that the exchange has been processed.
+ *
+ * For exchange creators (these are typically the Component
Consumers), if the exchange is being
+ * processes async, it should either wait for the exchange to get
completed asynchronously using
+ * one of the get() methods or it should register a Callback using
the setCallback() method. Once
+ * the exchange is done, then he can destroy the originating event.
+ *
+ */
+public class DefaultExchangeFuture implements Future<Exchange> {
+
+ private final Exchange exchange;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private boolean done;
+ private boolean canceled;
+ private Callback callback;
+
+ public static interface Callback {
+ void done(Exchange exchange);
+ }
+
+
+ /**
+ * @param exchange
+ */
+ public DefaultExchangeFuture(Exchange exchange) {
+ this.exchange = exchange;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean rc = false;
+ Callback c = null;
+
+ synchronized (this) {
+ if (!done && !canceled) {
+ c = callback;
+ canceled = true;
+ latch.countDown();
+ rc = true;
+ }
+ }
+
+ if (rc) {
+ latch.countDown();
+ if (c != null) {
+ c.done(exchange);
+ }
+ }
+ return rc;
+ }
+
+ public Exchange get() throws InterruptedException, ExecutionException {
+ latch.await();
+ synchronized (this) {
+ if (canceled) {
+ throw new CancellationException();
+ }
+ // TODO: We might want to do this..
+ // if (exception != null) {
+ // throw new ExecutionException(exception);
+ // }
+ return exchange;
+ }
+ }
+
+ public Exchange get(long timeout, TimeUnit unit) throws
InterruptedException, ExecutionException, TimeoutException {
+ latch.await(timeout, unit);
+ synchronized (this) {
+ if (canceled) {
+ throw new CancellationException();
+ }
+ // TODO: We might want to do this..
+ // if (exception != null) {
+ // throw new ExecutionException(exception);
+ // }
+ return exchange;
+ }
+ }
+
+ public synchronized boolean isCancelled() {
+ return canceled;
+ }
+
+ public synchronized boolean isDone() {
+ return done || canceled;
+ }
+
+ public synchronized Callback getCallback() {
+ return callback;
+ }
+
+ /**
+ * Registers a callback handler with the future if the future is not yet
+ * completed.
+ *
+ * @param callback
+ * @return false if the callback could not get registered due to the future
+ * being done.
+ */
+ public synchronized boolean setCallback(Callback callback) {
+ if (isDone()) {
+ return false;
+ }
+ this.callback = callback;
+ return true;
+ }
+
+ public void done() throws CancellationException, IllegalStateException {
+ Callback c = null;
+ synchronized (this) {
+ if (canceled) {
+ throw new CancellationException();
+ }
+ if (done) {
+ throw new IllegalStateException("Exchange is allready done");
+ }
+ done = true;
+ c = callback;
+ }
+ latch.countDown();
+ if (c != null) {
+ c.done(exchange);
+ }
+ }
+}
Property changes on:
camel-core/src/main/java/org/apache/camel/impl/DefaultExchangeFuture.java
___________________________________________________________________
Name: svn:keywords
+ Rev Date
Name: svn:eol-style
+ native
Index: camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (revision
568699)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (working
copy)
@@ -38,7 +38,8 @@
private Message fault;
private Throwable exception;
private String exchangeId =
DefaultExchange.DEFAULT_ID_GENERATOR.generateId();
-
+ private final DefaultExchangeFuture exchangeFuture = new
DefaultExchangeFuture(this);
+
public DefaultExchange(CamelContext context) {
this.context = context;
}
@@ -222,4 +223,9 @@
messageSupport.setExchange(this);
}
}
+
+
+ DefaultExchangeFuture getExchangeFuture() {
+ return exchangeFuture;
+ }
}
--
Regards,
Hiram
Blog:
http://hiramchirino.com