svn commit: r756358 - in /camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/ components/camel-xmpp/ components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/ components/camel-xmpp/src/test/resources/

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r756358 - in /camel/branches/camel-1.x: ./ camel-core/src/main/java/org/apache/camel/ components/camel-xmpp/ components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/ components/camel-xmpp/src/test/resources/

davsclaus-2
Author: davsclaus
Date: Fri Mar 20 07:35:41 2009
New Revision: 756358

URL: http://svn.apache.org/viewvc?rev=756358&view=rev
Log:
Merged revisions 756348 via svnmerge from
https://svn.apache.org/repos/asf/camel/trunk

........
  r756348 | davsclaus | 2009-03-20 08:03:00 +0100 (Fri, 20 Mar 2009) | 1 line
 
  CAMEL-1467, CAMEL-1470: Fixed XMPP to receive consume response msg so local queue does not fill up and we run out of memory. Added re-connect and improved logging information. Also added disconnect on stop.
........

Added:
    camel/branches/camel-1.x/components/camel-xmpp/src/test/resources/log4j.properties
      - copied unchanged from r756348, camel/trunk/components/camel-xmpp/src/test/resources/log4j.properties
Removed:
    camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/RuntimeXmppException.java
Modified:
    camel/branches/camel-1.x/   (props changed)
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/RuntimeExchangeException.java
    camel/branches/camel-1.x/components/camel-xmpp/pom.xml
    camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
    camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
    camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppExchange.java
    camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java
    camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java
    camel/branches/camel-1.x/components/camel-xmpp/src/test/resources/   (props changed)

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 20 07:35:41 2009
@@ -1 +1 @@
-/camel/trunk:736980,739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123,745105,745367,745541,745751,745826,745978,746269,746872,746895,746962,747258,747678-747704,748392,748436,748821,749563-749564,749574,749628-749629,749936,749956,750017,750334,750396,750761,750796,752068,752117,752418,752751-752755,752764-752773,752956,753087,753101,753175,755136,755487,756313
+/camel/trunk:736980,739733,739904,740251,740295,740306,740596,740663,741848,742231,742705,742739,742854,742856,742898,742906,743613,743762,743773,743920,743959-743960,744123,745105,745367,745541,745751,745826,745978,746269,746872,746895,746962,747258,747678-747704,748392,748436,748821,749563-749564,749574,749628-749629,749936,749956,750017,750334,750396,750761,750796,752068,752117,752418,752751-752755,752764-752773,752956,753087,753101,753175,755136,755487,756313,756348

Propchange: camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/RuntimeExchangeException.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/RuntimeExchangeException.java?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/RuntimeExchangeException.java (original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/RuntimeExchangeException.java Fri Mar 20 07:35:41 2009
@@ -35,6 +35,11 @@
         this.exchange = exchange;
     }
 
+    public RuntimeExchangeException(String message, Exchange exchange, Throwable cause) {
+        super(message + " on the exchange: " +  exchange, cause);
+        this.exchange = exchange;
+    }
+
     /**
      * Returns the exchange which caused the exception
      */

Modified: camel/branches/camel-1.x/components/camel-xmpp/pom.xml
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-xmpp/pom.xml?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-xmpp/pom.xml (original)
+++ camel/branches/camel-1.x/components/camel-xmpp/pom.xml Fri Mar 20 07:35:41 2009
@@ -93,6 +93,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java (original)
+++ camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppConsumer.java Fri Mar 20 07:35:41 2009
@@ -24,6 +24,7 @@
 import org.jivesoftware.smack.MessageListener;
 import org.jivesoftware.smack.PacketListener;
 import org.jivesoftware.smack.SmackConfiguration;
+import org.jivesoftware.smack.XMPPConnection;
 import org.jivesoftware.smack.packet.Message;
 import org.jivesoftware.smack.packet.Packet;
 import org.jivesoftware.smackx.muc.DiscussionHistory;
@@ -37,8 +38,8 @@
 public class XmppConsumer extends DefaultConsumer<XmppExchange> implements PacketListener, MessageListener {
     private static final transient Log LOG = LogFactory.getLog(XmppConsumer.class);
     private final XmppEndpoint endpoint;
-    private Chat privateChat;
     private MultiUserChat muc;
+    private XMPPConnection connection;
 
     public XmppConsumer(XmppEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -47,17 +48,25 @@
 
     @Override
     protected void doStart() throws Exception {
+        connection = endpoint.createConnection();
+
         if (endpoint.getRoom() == null) {
-            privateChat = endpoint.getConnection().getChatManager().createChat(endpoint.getParticipant(), this);
-            LOG.info("Open chat to " + privateChat.getParticipant());
+            Chat privateChat = connection.getChatManager().createChat(endpoint.getParticipant(), this);
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Open private chat to: " + privateChat.getParticipant());
+            }
         } else {
-            muc = new MultiUserChat(endpoint.getConnection(), endpoint.resolveRoom());
+            muc = new MultiUserChat(connection, endpoint.resolveRoom(connection));
             muc.addMessageListener(this);
             DiscussionHistory history = new DiscussionHistory();
             history.setMaxChars(0); // we do not want any historical messages
+
             muc.join(endpoint.getNickname(), null, history, SmackConfiguration.getPacketReplyTimeout());
-            LOG.info("Joined room: " + muc.getRoom());
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Joined room: " + muc.getRoom() + " as: " + endpoint.getNickname());
+            }
         }
+
         super.doStart();
     }
 
@@ -65,9 +74,20 @@
     protected void doStop() throws Exception {
         super.doStop();
         if (muc != null) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Leaving room: " + muc.getRoom());
+            }
+            muc.removeMessageListener(this);
             muc.leave();
             muc = null;
         }
+        if (connection != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Disconnecting from: " + XmppEndpoint.getConnectionMessage(connection));
+            }
+            connection.disconnect();
+            connection = null;
+        }
     }
 
     public void processPacket(Packet packet) {
@@ -81,7 +101,7 @@
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
-            LOG.error("Error while processing message", e);
+            LOG.error("Error while processing XMPP message", e);
         }
     }
 

Modified: camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java (original)
+++ camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppEndpoint.java Fri Mar 20 07:35:41 2009
@@ -18,12 +18,12 @@
 
 import java.util.Iterator;
 
-import org.apache.camel.CamelException;
 import org.apache.camel.Consumer;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jivesoftware.smack.AccountManager;
@@ -41,7 +41,6 @@
 public class XmppEndpoint extends DefaultEndpoint<XmppExchange> {
     private static final transient Log LOG = LogFactory.getLog(XmppEndpoint.class);
     private XmppBinding binding;
-    private XMPPConnection connection;
     private String host;
     private int port;
     private String user;
@@ -96,6 +95,84 @@
         return new XmppExchange(getCamelContext(), getExchangePattern(), getBinding(), message);
     }
 
+    public boolean isSingleton() {
+        return true;
+    }
+
+    public XMPPConnection createConnection() throws XMPPException {
+        XMPPConnection connection;
+
+        if (port > 0) {
+            if (getServiceName() == null) {
+                connection = new XMPPConnection(new ConnectionConfiguration(host, port));
+            } else {
+                connection = new XMPPConnection(new ConnectionConfiguration(host, port, getServiceName()));
+            }
+        } else {
+            connection = new XMPPConnection(host);
+        }
+
+        connection.connect();
+
+        if (login && !connection.isAuthenticated()) {
+            if (user != null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Logging in to XMPP as user: " + user + " on connection: " + getConnectionMessage(connection));
+                }
+                if (password == null) {
+                    LOG.warn("No password configured for user: " + user + " on connection: " + getConnectionMessage(connection));
+                }
+
+                if (createAccount) {
+                    AccountManager accountManager = new AccountManager(connection);
+                    accountManager.createAccount(user, password);
+                }
+                if (resource != null) {
+                    connection.login(user, password, resource);
+                } else {
+                    connection.login(user, password);
+                }
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Logging in anonymously to XMPP on connection: "  + getConnectionMessage(connection));
+                }
+                connection.loginAnonymously();
+            }
+
+            // presence is not needed to be sent after login
+        }
+
+        return connection;
+    }
+
+    /*
+     * If there is no "@" symbol in the room, find the chat service JID and
+     * return fully qualified JID for the room as [hidden email]
+     */
+    public String resolveRoom(XMPPConnection connection) throws XMPPException {
+        ObjectHelper.notEmpty(room, "room");
+
+        if (room.indexOf('@', 0) != -1) {
+            return room;
+        }
+
+        Iterator<String> iterator = MultiUserChat.getServiceNames(connection).iterator();
+        if (!iterator.hasNext()) {
+            throw new XMPPException("Cannot find Multi User Chat service on connection: " + getConnectionMessage(connection));
+        }
+
+        String chatServer = iterator.next();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Detected chat server: " + chatServer);
+        }
+
+        return room + "@" + chatServer;
+    }
+
+    public static String getConnectionMessage(XMPPConnection connetion) {
+        return connetion.getHost() + ":" + connetion.getPort() + "/" + connetion.getServiceName();
+    }
+
     // Properties
     // -------------------------------------------------------------------------
     public XmppBinding getBinding() {
@@ -201,86 +278,7 @@
         return serviceName;
     }    
     
-    public XMPPConnection getConnection() throws XMPPException {
-        if (connection == null) {
-            connection = createConnection();
-        }
-        return connection;
-    }
-
-    public void setConnection(XMPPConnection connection) {
-        this.connection = connection;
-    }
-
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected XMPPConnection createConnection() throws XMPPException {
-        XMPPConnection connection;
-        if (port > 0) {            
-            if (getServiceName() == null) {
-                connection = new XMPPConnection(new ConnectionConfiguration(host, port));
-            } else {
-                connection = new XMPPConnection(new ConnectionConfiguration(host, port, getServiceName()));
-            }
-        } else {
-            connection = new XMPPConnection(host);
-        }
-
-        connection.connect();
-
-        if (login && !connection.isAuthenticated()) {
-            if (user != null) {
-                LOG.info("Logging in to XMPP as user: " + user + " on connection: " + connection);
-                if (password == null) {
-                    LOG.warn("No password configured for user: " + user);
-                }
-
-                if (createAccount) {
-                    AccountManager accountManager = new AccountManager(connection);
-                    accountManager.createAccount(user, password);
-                }
-                if (resource != null) {
-                    connection.login(user, password, resource);
-                } else {
-                    connection.login(user, password);
-                }
-            } else {
-                LOG.info("Logging in anonymously to XMPP on connection: " + connection);
-                connection.loginAnonymously();
-            }
-
-            // presence is not needed to be sent after login
-        }
-        return connection;
-    }
-
-    /*
-     * If there is no "@" symbol in the room, find the chat service JID and
-     * return fully qualified JID for the room as [hidden email]
-     */
-    public String resolveRoom() throws XMPPException, CamelException {
-        if (room == null) {
-            throw new IllegalArgumentException("room is not specified");
-        }
 
-        if (room.indexOf('@', 0) != -1) {
-            return room;
-        }
-
-        XMPPConnection conn = getConnection();
-        Iterator<String> iterator = MultiUserChat.getServiceNames(conn).iterator();
-        if (!iterator.hasNext()) {
-            throw new CamelException("Can not find Multi User Chat service");
-        }
-        String chatServer = iterator.next();
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Detected chat server: " + chatServer);
-        }
-
-        return room + "@" + chatServer;
-    }
-
-    public boolean isSingleton() {
-        return true;
-    }
 }

Modified: camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppExchange.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppExchange.java?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppExchange.java (original)
+++ camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppExchange.java Fri Mar 20 07:35:41 2009
@@ -20,7 +20,6 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.DefaultExchange;
-
 import org.jivesoftware.smack.packet.Message;
 
 /**

Modified: camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java (original)
+++ camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppGroupChatProducer.java Fri Mar 20 07:35:41 2009
@@ -16,12 +16,13 @@
  */
 package org.apache.camel.component.xmpp;
 
-import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.impl.DefaultProducer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jivesoftware.smack.SmackConfiguration;
+import org.jivesoftware.smack.XMPPConnection;
 import org.jivesoftware.smack.XMPPException;
 import org.jivesoftware.smack.packet.Message;
 import org.jivesoftware.smackx.muc.DiscussionHistory;
@@ -33,66 +34,86 @@
 public class XmppGroupChatProducer extends DefaultProducer {
     private static final transient Log LOG = LogFactory.getLog(XmppGroupChatProducer.class);
     private final XmppEndpoint endpoint;
-    private final String room;
+    private XMPPConnection connection;
     private MultiUserChat chat;
+    private String room;
 
-    public XmppGroupChatProducer(XmppEndpoint endpoint) throws XMPPException, CamelException {
+    public XmppGroupChatProducer(XmppEndpoint endpoint) throws XMPPException {
         super(endpoint);
         this.endpoint = endpoint;
-        this.room = endpoint.resolveRoom();
-        if (room == null) {
-            throw new IllegalArgumentException("No room property specified");
-        }
     }
 
     public void process(Exchange exchange) {
-        // TODO it would be nice if we could reuse the message from the exchange
         Message message = chat.createMessage();
         message.setTo(room);
         message.setFrom(endpoint.getUser());
 
         endpoint.getBinding().populateXmppMessage(message, exchange);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Sending XMPP message: " + message.getBody());
-        }
         try {
+            // make sure we are connected
+            if (!connection.isConnected()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Reconnecting to: " + XmppEndpoint.getConnectionMessage(connection));
+                }
+                connection.connect();
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Sending XMPP message: " + message.getBody());
+            }
             chat.sendMessage(message);
+            // must invoke nextMessage to consume the response from the server
+            // otherwise the client local queue will fill up (CAMEL-1467)
+            chat.nextMessage();
         } catch (XMPPException e) {
-            throw new RuntimeXmppException(e);
+            throw new RuntimeExchangeException("Cannot send XMPP message: " + message, exchange, e);
         }
     }
 
     @Override
     protected void doStart() throws Exception {
+        if (connection == null) {
+            connection = endpoint.createConnection();
+        }
+
         if (chat == null) {
-            chat = new MultiUserChat(endpoint.getConnection(), room);
+            room = endpoint.resolveRoom(connection);
+            chat = new MultiUserChat(connection, room);
             DiscussionHistory history = new DiscussionHistory();
             history.setMaxChars(0); // we do not want any historical messages
-            chat.join(this.endpoint.getNickname(), null, history, SmackConfiguration.getPacketReplyTimeout());
+            chat.join(endpoint.getNickname(), null, history, SmackConfiguration.getPacketReplyTimeout());
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Joined room: " + room + " as: " + endpoint.getNickname());
+            }
         }
+
         super.doStart();
     }
 
     @Override
     protected void doStop() throws Exception {
         if (chat != null) {
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Leaving room: " + room);
+            }
             chat.leave();
             chat = null;
         }
+        if (connection != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Disconnecting from: " + XmppEndpoint.getConnectionMessage(connection));
+            }
+            connection.disconnect();
+            connection = null;
+        }
         super.doStop();
     }
 
     // Properties
     // -------------------------------------------------------------------------
-    public MultiUserChat getChat() {
-        return chat;
-    }
-
-    public void setChat(MultiUserChat chat) {
-        this.chat = chat;
-    }
 
     public String getRoom() {
         return room;
     }
+
 }

Modified: camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java?rev=756358&r1=756357&r2=756358&view=diff
==============================================================================
--- camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java (original)
+++ camel/branches/camel-1.x/components/camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppPrivateChatProducer.java Fri Mar 20 07:35:41 2009
@@ -17,12 +17,15 @@
 package org.apache.camel.component.xmpp;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jivesoftware.smack.Chat;
 import org.jivesoftware.smack.ChatManager;
 import org.jivesoftware.smack.MessageListener;
+import org.jivesoftware.smack.XMPPConnection;
 import org.jivesoftware.smack.XMPPException;
 import org.jivesoftware.smack.packet.Message;
 
@@ -32,64 +35,80 @@
 public class XmppPrivateChatProducer extends DefaultProducer {
     private static final transient Log LOG = LogFactory.getLog(XmppPrivateChatProducer.class);
     private final XmppEndpoint endpoint;
+    private XMPPConnection connection;
     private final String participant;
 
-
     public XmppPrivateChatProducer(XmppEndpoint endpoint, String participant) {
         super(endpoint);
         this.endpoint = endpoint;
         this.participant = participant;
-        if (participant == null) {
-            throw new IllegalArgumentException("No participant property specified");
-        }
+        ObjectHelper.notEmpty(participant, "participant");
     }
 
     public void process(Exchange exchange) {
         String threadId = exchange.getExchangeId();
 
         try {
-            ChatManager chatManager = endpoint.getConnection().getChatManager();
-            Chat chat = chatManager.getThreadChat(threadId);
+            if (connection == null) {
+                connection = endpoint.createConnection();
+            }
 
-            if (chat == null) {
-                chat = chatManager.createChat(getParticipant(), threadId, new MessageListener() {
-                    public void processMessage(Chat chat, Message message) {
-                        // not here to do conversation
-                    }
-                });
+            // make sure we are connected
+            if (!connection.isConnected()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Reconnecting to: " + XmppEndpoint.getConnectionMessage(connection));
+                }
+                connection.connect();
             }
+        } catch (XMPPException e) {
+            throw new RuntimeExchangeException("Cannot connect to: "
+                    + XmppEndpoint.getConnectionMessage(connection), exchange, e);
+        }
 
-            // TODO it would be nice if we could reuse the message from the exchange
-            Message message = new Message();
+        ChatManager chatManager = connection.getChatManager();
+        Chat chat = chatManager.getThreadChat(threadId);
+        if (chat == null) {
+            chat = chatManager.createChat(getParticipant(), threadId, new MessageListener() {
+                public void processMessage(Chat chat, Message message) {
+                    // not here to do conversation
+                }
+            });
+        }
+
+        Message message = null;
+        try {
+            message = new Message();
             message.setTo(participant);
             message.setThread(threadId);
             message.setType(Message.Type.normal);
 
             endpoint.getBinding().populateXmppMessage(message, exchange);
+
             if (LOG.isDebugEnabled()) {
-                LOG.debug(">>>> message: " + message.getBody());
+                LOG.debug("Sending XMPP message: " + message.getBody());
             }
-
             chat.sendMessage(message);
         } catch (XMPPException e) {
-            throw new RuntimeXmppException(e);
+            throw new RuntimeExchangeException("Cannot send XMPP message: " + message
+                    + " to: " + XmppEndpoint.getConnectionMessage(connection), exchange, e);
         }
     }
 
     @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-    }
-
-    @Override
     protected void doStop() throws Exception {
+        if (connection != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Disconnecting from: " + XmppEndpoint.getConnectionMessage(connection));
+            }
+            connection.disconnect();
+            connection = null;
+        }
         super.doStop();
     }
 
     // Properties
     // -------------------------------------------------------------------------
 
-
     public String getParticipant() {
         return participant;
     }

Propchange: camel/branches/camel-1.x/components/camel-xmpp/src/test/resources/
------------------------------------------------------------------------------
    svn:mergeinfo =