svn commit: r697472 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/strategy/ test/java/org/apache/camel/component/file/

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r697472 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/component/file/strategy/ test/java/org/apache/camel/component/file/

davsclaus-2
Author: davsclaus
Date: Sun Sep 21 04:02:51 2008
New Revision: 697472

URL: http://svn.apache.org/viewvc?rev=697472&view=rev
Log:
CAMEL-925: file consumer throws IOException if rename or delete consumed file failed

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java
      - copied, changed from r697361, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/DeleteFileProcessStrategy.java Sun Sep 21 04:02:51 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.component.file.strategy;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.apache.camel.component.file.FileEndpoint;
 import org.apache.camel.component.file.FileExchange;
@@ -45,7 +46,7 @@
         }
         boolean deleted = file.delete();
         if (!deleted) {
-            LOG.warn("Could not delete file: " + file);
+            throw new IOException("Can not delete file: " + file);
         }
 
         // must commit to release the lock

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java Sun Sep 21 04:02:51 2008
@@ -26,6 +26,7 @@
 import org.apache.camel.component.file.FileExchange;
 import org.apache.camel.component.file.FileProcessStrategy;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -63,12 +64,12 @@
             FileChannel channel = new RandomAccessFile(lockFileName, "rw").getChannel();
             FileLock lock = channel.lock();
             if (lock != null) {
-                exchange.setProperty("org.apache.camel.fileChannel", channel);
                 exchange.setProperty("org.apache.camel.file.lock", lock);
                 exchange.setProperty("org.apache.camel.file.lock.name", lockFileName);
                 return true;
+            } else {
+                return false;
             }
-            return false;
         }
         return true;
     }
@@ -81,7 +82,7 @@
         try {
             unlockFile(endpoint, exchange, file);
         } catch (Exception e) {
-            LOG.info("Unable to unlock file: " + file + ": " + e.getMessage(), e);
+            LOG.warn("Unable to unlock file: " + file, e);
         }
     }
 
@@ -103,14 +104,24 @@
 
     protected void unlockFile(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception {
         if (isLockFile()) {
-            Channel channel = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.fileChannel", Channel.class);
-            String lockfile = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock.name", String.class);
+            FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock", FileLock.class);
+            String lockFileName = ExchangeHelper.getMandatoryProperty(exchange, "org.apache.camel.file.lock.name", String.class);
+            Channel channel = lock.channel();
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Unlocking file: " + file);
             }
-            channel.close();
-            File lock = new File(lockfile);
-            lock.delete();
+            try {
+                lock.release();
+            } finally {
+                // must close channel
+                ObjectHelper.close(channel, "Closing channel", LOG);
+
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Deleting lock file: " + lockFileName);
+                }
+                File lockfile = new File(lockFileName);
+                lockfile.delete();
+            }
         }
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/RenameFileProcessStrategy.java Sun Sep 21 04:02:51 2008
@@ -17,6 +17,7 @@
 package org.apache.camel.component.file.strategy;
 
 import java.io.File;
+import java.io.IOException;
 
 import org.apache.camel.component.file.FileEndpoint;
 import org.apache.camel.component.file.FileExchange;
@@ -52,10 +53,6 @@
     @Override
     public void commit(FileEndpoint endpoint, FileExchange exchange, File file) throws Exception {
         File newName = renamer.renameFile(exchange, file);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Renaming file: " + file + " to: " + newName);
-        }
-
         // deleting any existing files before renaming
         if (newName.exists()) {
             newName.delete();
@@ -64,9 +61,12 @@
         // make parent folder if missing
         newName.getParentFile().mkdirs();
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Renaming file: " + file + " to: " + newName);
+        }
         boolean renamed = file.renameTo(newName);
         if (!renamed) {
-            LOG.warn("Could not rename file from: " + file + " to " + newName);
+            throw new IOException("Can not rename file from: " + file + " to: " + newName);
         }
 
         // must commit to release the lock

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java (from r697361, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java&r1=697361&r2=697472&rev=697472&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeFalseTest.java Sun Sep 21 04:02:51 2008
@@ -23,9 +23,9 @@
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for the alwaysConsume option.
+ * Unit test for the alwaysConsume=false option.
  */
-public class FileAlwaysConsumeTest extends ContextTestSupport {
+public class FileAlwaysConsumeFalseTest extends ContextTestSupport {
 
     @Override
     protected void setUp() throws Exception {
@@ -34,44 +34,16 @@
         template.sendBodyAndHeader("file://target/alwaysconsume/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
     }
 
-    public void testAlwaysConsume() throws Exception {
-        context.addRoutes(new RouteBuilder() {
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
             public void configure() throws Exception {
-                from("file://target/alwaysconsume/?consumer.alwaysConsume=true&moveNamePrefix=done/").to("mock:result");
+                from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
             }
-        });
-
-        // consume the file the first time
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
-        mock.expectedMessageCount(1);
-
-        assertMockEndpointsSatisfied();
-
-        Thread.sleep(1000);
-
-        // reset mock and set new expectations
-        mock.reset();
-        mock.expectedBodiesReceived("Hello World");
-        mock.expectedMessageCount(1);
-
-        // move file back
-        File file = new File("target/alwaysconsume/done/report.txt");
-        File renamed = new File("target/alwaysconsume/report.txt");
-        file = file.getAbsoluteFile();
-        file.renameTo(renamed.getAbsoluteFile());
-
-        // should consume the file again
-        assertMockEndpointsSatisfied();
+        };
     }
 
     public void testNotAlwaysConsume() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            public void configure() throws Exception {
-                from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
-            }
-        });
-
         // consume the file the first time
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java?rev=697472&r1=697471&r2=697472&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileAlwaysConsumeTest.java Sun Sep 21 04:02:51 2008
@@ -23,7 +23,7 @@
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
- * Unit test for the alwaysConsume option.
+ * Unit test for the alwaysConsume=true option.
  */
 public class FileAlwaysConsumeTest extends ContextTestSupport {
 
@@ -34,13 +34,17 @@
         template.sendBodyAndHeader("file://target/alwaysconsume/", "Hello World", FileComponent.HEADER_FILE_NAME, "report.txt");
     }
 
-    public void testAlwaysConsume() throws Exception {
-        context.addRoutes(new RouteBuilder() {
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
             public void configure() throws Exception {
                 from("file://target/alwaysconsume/?consumer.alwaysConsume=true&moveNamePrefix=done/").to("mock:result");
             }
-        });
+        };
+
+    }
 
+    public void testAlwaysConsume() throws Exception {
         // consume the file the first time
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("Hello World");
@@ -65,35 +69,4 @@
         assertMockEndpointsSatisfied();
     }
 
-    public void testNotAlwaysConsume() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            public void configure() throws Exception {
-                from("file://target/alwaysconsume/?consumer.alwaysConsume=false&moveNamePrefix=done/").to("mock:result");
-            }
-        });
-
-        // consume the file the first time
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World");
-        mock.expectedMessageCount(1);
-
-        assertMockEndpointsSatisfied();
-
-        Thread.sleep(1000);
-
-        // reset mock and set new expectations
-        mock.reset();
-        mock.expectedMessageCount(0);
-
-        // move file back
-        File file = new File("target/alwaysconsume/done/report.txt");
-        File renamed = new File("target/alwaysconsume/report.txt");
-        file = file.getAbsoluteFile();
-        file.renameTo(renamed.getAbsoluteFile());
-
-        // should NOT consume the file again, let 2 secs pass to let the consuemr try to consume it but it should not
-        Thread.sleep(2000);
-        assertMockEndpointsSatisfied();
-    }
-
 }
\ No newline at end of file