[camel] branch master updated (908e51b -> 3a82e8f)

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated (908e51b -> 3a82e8f)

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 908e51b  Upgrade groovy and logger
     new 722298e  Increase configuration options for Pulsar
     new ddbf1ea  Increase configuration options for Pulsar
     new 3978e75  Fix checkstyle issues
     new 3a82e8f  Fix merge conflicts

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/main/docs/pulsar-component.adoc            |  11 +-
 .../camel/component/pulsar/PulsarProducer.java     |  17 +-
 .../pulsar/configuration/PulsarConfiguration.java  | 146 ++++++++++++
 .../pulsar/PulsarConsumerAcknowledgementTest.java  |   2 +-
 .../PulsarConsumerNoAcknowledgementTest.java       |   2 +-
 .../pulsar/PulsarCustomMessageReceiptTest.java     |  17 +-
 .../pulsar/PulsarNegativeAcknowledgementTest.java  |   4 +-
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 260 +++++++++++++++++++++
 8 files changed, 444 insertions(+), 15 deletions(-)

Reply | Threaded
Open this post in threaded view
|

[camel] 01/04: Increase configuration options for Pulsar

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 722298e85027de6a0e64d6d731653bc0f32b3f9c
Author: Sherman Richard <[hidden email]>
AuthorDate: Tue Aug 13 13:33:10 2019 +0100

    Increase configuration options for Pulsar
---
 .../camel/component/pulsar/PulsarProducer.java     |  17 ++-
 .../pulsar/configuration/PulsarConfiguration.java  | 144 +++++++++++++++++++++
 2 files changed, 159 insertions(+), 2 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 70f1d40..140f0c7 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -20,11 +20,14 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
+import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
+import java.util.concurrent.TimeUnit;
+
 public class PulsarProducer extends DefaultProducer {
 
     private final PulsarEndpoint pulsarEndpoint;
@@ -52,7 +55,8 @@ public class PulsarProducer extends DefaultProducer {
     private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
         if (producer == null) {
             final String topicUri = pulsarEndpoint.getUri();
-            String producerName = pulsarEndpoint.getPulsarConfiguration().getProducerName();
+            PulsarConfiguration configuration = pulsarEndpoint.getPulsarConfiguration();
+            String producerName = configuration.getProducerName();
             if (producerName == null) {
                 producerName = topicUri + "-" + Thread.currentThread().getId();
             }
@@ -60,7 +64,16 @@ public class PulsarProducer extends DefaultProducer {
                     .getPulsarClient()
                     .newProducer()
                     .producerName(producerName)
-                    .topic(topicUri);
+                    .topic(topicUri)
+                    .sendTimeout(configuration.getSendTimeoutMs(), TimeUnit.MILLISECONDS)
+                    .blockIfQueueFull(configuration.isBlockIfQueueFull())
+                    .maxPendingMessages(configuration.getMaxPendingMessages())
+                    .maxPendingMessagesAcrossPartitions(configuration.getMaxPendingMessagesAcrossPartitions())
+                    .batchingMaxPublishDelay(configuration.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS)
+                    .batchingMaxMessages(configuration.getMaxPendingMessages())
+                    .enableBatching(configuration.isBatchingEnabled())
+                    .initialSequenceId(configuration.getInitialSequenceId())
+                    .compressionType(configuration.getCompressionType());
             producer = producerBuilder.create();
         }
     }
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index 6b160fd..b2099d3 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -19,6 +19,9 @@ package org.apache.camel.component.pulsar.configuration;
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
+import org.apache.pulsar.client.api.CompressionType;
+
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
@@ -45,6 +48,24 @@ public class PulsarConfiguration {
     private long ackTimeoutMillis = 10000;
     @UriParam(label = "consumer", defaultValue = "100")
     private long ackGroupTimeMillis = 100;
+    @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
+    private int sendTimeoutMs = 30000;
+    @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
+    private boolean blockIfQueueFull = false;
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+    private int maxPendingMessages = 1000;
+    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+    private int maxPendingMessagesAcrossPartitions = 50000;
+    @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
+    private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);
+    @UriParam(label = "producer", description = "The maximum size to batch messages.", defaultValue = "1000")
+    private int batchingMaxMessages = 1000;
+    @UriParam(label = "producer", description = "Control whether automatic batching of messages is enabled for the producer.", defaultValue = "true")
+    private boolean batchingEnabled = true;
+    @UriParam(label = "producer", description = "The first message published will have a sequence Id of initialSequenceId  1.", defaultValue = "-1")
+    private long initialSequenceId = -1;
+    @UriParam(label = "producer", description = "Compression type to use, defaults to NONE from [NONE, LZ4, ZLIB]", defaultValue = "NONE")
+    private CompressionType compressionType = CompressionType.NONE;
 
     public String getSubscriptionName() {
         return subscriptionName;
@@ -159,4 +180,127 @@ public class PulsarConfiguration {
     public void setAckGroupTimeMillis(long ackGroupTimeMillis) {
         this.ackGroupTimeMillis = ackGroupTimeMillis;
     }
+
+    /**
+      * Send timeout in milliseconds.
+      * Defaults to 30,000ms (30 seconds)
+     */
+    public void setSendTimeoutMs(int sendTimeoutMs) {
+        this.sendTimeoutMs = sendTimeoutMs;
+    }
+
+    public int getSendTimeoutMs() {
+        return sendTimeoutMs;
+    }
+
+    /**
+     * Set whether the send and asyncSend operations should block when the outgoing message queue is full.
+     * If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left
+     * in the pending queue.
+     * Default is false.
+     */
+    public void setBlockIfQueueFull(boolean blockIfQueueFull) {
+        this.blockIfQueueFull = blockIfQueueFull;
+    }
+
+    public boolean isBlockIfQueueFull() {
+        return blockIfQueueFull;
+    }
+
+    /**
+     * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
+     * Default is 1000.
+     */
+    public void setMaxPendingMessages(int maxPendingMessages) {
+        this.maxPendingMessages = maxPendingMessages;
+    }
+
+    public int getMaxPendingMessages() {
+        return maxPendingMessages;
+    }
+
+    /**
+     * Set the number of max pending messages across all the partitions.
+     * Default is 50000.
+     */
+    public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
+        this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+    }
+
+    public int getMaxPendingMessagesAcrossPartitions() {
+        return maxPendingMessagesAcrossPartitions;
+    }
+
+    /**
+     * Set the time period within which the messages sent will be batched if batch messages are
+     * enabled. If set to a non zero value, messages will be queued until either:
+     * <ul>
+     *  <li>this time interval expires</li>
+     *  <li>the max number of messages in a batch is reached
+     * </ul>
+     * Default is 1ms.
+     */
+    public void setBatchingMaxPublishDelayMicros(long batchingMaxPublishDelayMicros) {
+        this.batchingMaxPublishDelayMicros = batchingMaxPublishDelayMicros;
+    }
+
+    public long getBatchingMaxPublishDelayMicros() {
+        return batchingMaxPublishDelayMicros;
+    }
+
+    /**
+     * Set the maximum number of messages permitted in a batch.
+     * Default 1,000.
+     */
+    public void setBatchingMaxMessages(int batchingMaxMessages) {
+        this.batchingMaxMessages = batchingMaxMessages;
+    }
+
+    public int getBatchingMaxMessages() {
+        return batchingMaxMessages;
+    }
+
+    /**
+     * Control whether automatic batching of messages is enabled for the producer.
+     * Default is true.
+     */
+    public void setBatchingEnabled(boolean batchingEnabled) {
+        this.batchingEnabled = batchingEnabled;
+    }
+
+    public boolean isBatchingEnabled() {
+        return batchingEnabled;
+    }
+
+    /**
+     * Set the baseline for the sequence ids for messages published by the producer.
+     * First message will be using (initialSequenceId  1) as its sequence id and subsequent messages will be assigned
+     * incremental sequence ids, if not otherwise specified.
+     */
+    public void setInitialSequenceId(long initialSequenceId) {
+        this.initialSequenceId = initialSequenceId;
+    }
+
+    public long getInitialSequenceId() {
+        return initialSequenceId;
+    }
+
+    /**
+     *
+     * Set the compression type for the producer.
+     * Supported compression types are:
+     * <ul>
+     *  <li>NONE: No compression</li>
+     *  <li>LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib</li>
+     *  <li>ZLI: Standard ZLib compression</li>
+     * </ul>
+     * Default is NONE
+     */
+    public void setCompressionType(String compressionType) {
+        this.compressionType = CompressionType.valueOf(compressionType.toUpperCase());
+    }
+
+    public CompressionType getCompressionType() {
+        return compressionType;
+    }
 }

Reply | Threaded
Open this post in threaded view
|

[camel] 02/04: Increase configuration options for Pulsar

davsclaus-2
In reply to this post by davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit ddbf1ea31c535b529b9e5045a76f577e0d5d1b33
Author: Sherman Richard <[hidden email]>
AuthorDate: Tue Aug 13 13:57:09 2019 +0100

    Increase configuration options for Pulsar
---
 .../src/main/docs/pulsar-component.adoc            |  11 +-
 .../endpoint/dsl/PulsarEndpointBuilderFactory.java | 260 +++++++++++++++++++++
 2 files changed, 270 insertions(+), 1 deletion(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index 89141bb..7b929f9 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -69,7 +69,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (16 parameters):
+=== Query Parameters (25 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -87,8 +87,17 @@ with the following path and query parameters:
 | *subscriptionType* (consumer) | Type of the subscription EXCLUSIVESHAREDFAILOVER, defaults to EXCLUSIVE | EXCLUSIVE | SubscriptionType
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
+| *batchingEnabled* (producer) | Control whether automatic batching of messages is enabled for the producer. Default is true. | true | boolean
+| *batchingMaxMessages* (producer) | Set the maximum number of messages permitted in a batch. Default 1,000. | 1000 | int
+| *batchingMaxPublishDelay Micros* (producer) | Set the time period within which the messages sent will be batched if batch messages are enabled. If set to a non zero value, messages will be queued until either: this time interval expires the max number of messages in a batch is reached Default is 1ms. | 1000 | long
+| *blockIfQueueFull* (producer) | Set whether the send and asyncSend operations should block when the outgoing message queue is full. If set to false, send operations will immediately fail with ProducerQueueIsFullError when there is no space left in the pending queue. Default is false. | false | boolean
+| *compressionType* (producer) | Set the compression type for the producer. Supported compression types are: NONE: No compression LZ4: Compress with LZ4 algorithm. Faster but lower compression than ZLib ZLI: Standard ZLib compression Default is NONE | NONE | CompressionType
+| *initialSequenceId* (producer) | Set the baseline for the sequence ids for messages published by the producer. First message will be using (initialSequenceId 1) as its sequence id and subsequent messages will be assigned incremental sequence ids, if not otherwise specified. | -1 | long
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
+| *maxPendingMessages* (producer) | Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. Default is 1000. | 1000 | int
+| *maxPendingMessagesAcross Partitions* (producer) | Set the number of max pending messages across all the partitions. Default is 50000. | 50000 | int
 | *producerName* (producer) | Name of the producer | default-producer | String
+| *sendTimeoutMs* (producer) | Send timeout in milliseconds. Defaults to 30,000ms (30 seconds) | 30000 | int
 | *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 | *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
 |===
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
index 82f0232..75fdbf3 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/PulsarEndpointBuilderFactory.java
@@ -407,6 +407,180 @@ public interface PulsarEndpointBuilderFactory {
             return (AdvancedPulsarEndpointProducerBuilder) this;
         }
         /**
+         * Control whether automatic batching of messages is enabled for the
+         * producer. Default is true.
+         *
+         * The option is a: <code>boolean</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batchingEnabled(
+                boolean batchingEnabled) {
+            setProperty("batchingEnabled", batchingEnabled);
+            return this;
+        }
+        /**
+         * Control whether automatic batching of messages is enabled for the
+         * producer. Default is true.
+         *
+         * The option will be converted to a <code>boolean</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batchingEnabled(
+                String batchingEnabled) {
+            setProperty("batchingEnabled", batchingEnabled);
+            return this;
+        }
+        /**
+         * Set the maximum number of messages permitted in a batch. Default
+         * 1,000.
+         *
+         * The option is a: <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batchingMaxMessages(
+                int batchingMaxMessages) {
+            setProperty("batchingMaxMessages", batchingMaxMessages);
+            return this;
+        }
+        /**
+         * Set the maximum number of messages permitted in a batch. Default
+         * 1,000.
+         *
+         * The option will be converted to a <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batchingMaxMessages(
+                String batchingMaxMessages) {
+            setProperty("batchingMaxMessages", batchingMaxMessages);
+            return this;
+        }
+        /**
+         * Set the time period within which the messages sent will be batched if
+         * batch messages are enabled. If set to a non zero value, messages will
+         * be queued until either: this time interval expires the max number of
+         * messages in a batch is reached Default is 1ms.
+         *
+         * The option is a: <code>long</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batchingMaxPublishDelayMicros(
+                long batchingMaxPublishDelayMicros) {
+            setProperty("batchingMaxPublishDelayMicros", batchingMaxPublishDelayMicros);
+            return this;
+        }
+        /**
+         * Set the time period within which the messages sent will be batched if
+         * batch messages are enabled. If set to a non zero value, messages will
+         * be queued until either: this time interval expires the max number of
+         * messages in a batch is reached Default is 1ms.
+         *
+         * The option will be converted to a <code>long</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder batchingMaxPublishDelayMicros(
+                String batchingMaxPublishDelayMicros) {
+            setProperty("batchingMaxPublishDelayMicros", batchingMaxPublishDelayMicros);
+            return this;
+        }
+        /**
+         * Set whether the send and asyncSend operations should block when the
+         * outgoing message queue is full. If set to false, send operations will
+         * immediately fail with ProducerQueueIsFullError when there is no space
+         * left in the pending queue. Default is false.
+         *
+         * The option is a: <code>boolean</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder blockIfQueueFull(
+                boolean blockIfQueueFull) {
+            setProperty("blockIfQueueFull", blockIfQueueFull);
+            return this;
+        }
+        /**
+         * Set whether the send and asyncSend operations should block when the
+         * outgoing message queue is full. If set to false, send operations will
+         * immediately fail with ProducerQueueIsFullError when there is no space
+         * left in the pending queue. Default is false.
+         *
+         * The option will be converted to a <code>boolean</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder blockIfQueueFull(
+                String blockIfQueueFull) {
+            setProperty("blockIfQueueFull", blockIfQueueFull);
+            return this;
+        }
+        /**
+         * Set the compression type for the producer. Supported compression
+         * types are: NONE: No compression LZ4: Compress with LZ4 algorithm.
+         * Faster but lower compression than ZLib ZLI: Standard ZLib compression
+         * Default is NONE.
+         *
+         * The option is a:
+         * <code>org.apache.pulsar.client.api.CompressionType</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder compressionType(
+                CompressionType compressionType) {
+            setProperty("compressionType", compressionType);
+            return this;
+        }
+        /**
+         * Set the compression type for the producer. Supported compression
+         * types are: NONE: No compression LZ4: Compress with LZ4 algorithm.
+         * Faster but lower compression than ZLib ZLI: Standard ZLib compression
+         * Default is NONE.
+         *
+         * The option will be converted to a
+         * <code>org.apache.pulsar.client.api.CompressionType</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder compressionType(
+                String compressionType) {
+            setProperty("compressionType", compressionType);
+            return this;
+        }
+        /**
+         * Set the baseline for the sequence ids for messages published by the
+         * producer. First message will be using (initialSequenceId 1) as its
+         * sequence id and subsequent messages will be assigned incremental
+         * sequence ids, if not otherwise specified.
+         *
+         * The option is a: <code>long</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder initialSequenceId(
+                long initialSequenceId) {
+            setProperty("initialSequenceId", initialSequenceId);
+            return this;
+        }
+        /**
+         * Set the baseline for the sequence ids for messages published by the
+         * producer. First message will be using (initialSequenceId 1) as its
+         * sequence id and subsequent messages will be assigned incremental
+         * sequence ids, if not otherwise specified.
+         *
+         * The option will be converted to a <code>long</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder initialSequenceId(
+                String initialSequenceId) {
+            setProperty("initialSequenceId", initialSequenceId);
+            return this;
+        }
+        /**
          * Whether the producer should be started lazy (on the first message).
          * By starting lazy you can use this to allow CamelContext and routes to
          * startup in situations where a producer may otherwise fail during
@@ -447,6 +621,58 @@ public interface PulsarEndpointBuilderFactory {
             return this;
         }
         /**
+         * Set the max size of the queue holding the messages pending to receive
+         * an acknowledgment from the broker. Default is 1000.
+         *
+         * The option is a: <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder maxPendingMessages(
+                int maxPendingMessages) {
+            setProperty("maxPendingMessages", maxPendingMessages);
+            return this;
+        }
+        /**
+         * Set the max size of the queue holding the messages pending to receive
+         * an acknowledgment from the broker. Default is 1000.
+         *
+         * The option will be converted to a <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder maxPendingMessages(
+                String maxPendingMessages) {
+            setProperty("maxPendingMessages", maxPendingMessages);
+            return this;
+        }
+        /**
+         * Set the number of max pending messages across all the partitions.
+         * Default is 50000.
+         *
+         * The option is a: <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder maxPendingMessagesAcrossPartitions(
+                int maxPendingMessagesAcrossPartitions) {
+            setProperty("maxPendingMessagesAcrossPartitions", maxPendingMessagesAcrossPartitions);
+            return this;
+        }
+        /**
+         * Set the number of max pending messages across all the partitions.
+         * Default is 50000.
+         *
+         * The option will be converted to a <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder maxPendingMessagesAcrossPartitions(
+                String maxPendingMessagesAcrossPartitions) {
+            setProperty("maxPendingMessagesAcrossPartitions", maxPendingMessagesAcrossPartitions);
+            return this;
+        }
+        /**
          * Name of the producer.
          *
          * The option is a: <code>java.lang.String</code> type.
@@ -457,6 +683,28 @@ public interface PulsarEndpointBuilderFactory {
             setProperty("producerName", producerName);
             return this;
         }
+        /**
+         * Send timeout in milliseconds. Defaults to 30,000ms (30 seconds).
+         *
+         * The option is a: <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder sendTimeoutMs(int sendTimeoutMs) {
+            setProperty("sendTimeoutMs", sendTimeoutMs);
+            return this;
+        }
+        /**
+         * Send timeout in milliseconds. Defaults to 30,000ms (30 seconds).
+         *
+         * The option will be converted to a <code>int</code> type.
+         *
+         * Group: producer
+         */
+        default PulsarEndpointProducerBuilder sendTimeoutMs(String sendTimeoutMs) {
+            setProperty("sendTimeoutMs", sendTimeoutMs);
+            return this;
+        }
     }
 
     /**
@@ -603,6 +851,18 @@ public interface PulsarEndpointBuilderFactory {
         SHARED,
         FAILOVER;
     }
+
+    /**
+     * Proxy enum for <code>org.apache.pulsar.client.api.CompressionType</code>
+     * enum.
+     */
+    enum CompressionType {
+        NONE,
+        LZ4,
+        ZLIB,
+        ZSTD,
+        SNAPPY;
+    }
     /**
      * Apache Pulsar (camel-pulsar)
      * Camel Apache Pulsar Component

Reply | Threaded
Open this post in threaded view
|

[camel] 03/04: Fix checkstyle issues

davsclaus-2
In reply to this post by davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3978e75b453c5c9eb5c0f2366d0759f9279647eb
Author: Sherman Richard <[hidden email]>
AuthorDate: Tue Aug 13 15:25:19 2019 +0100

    Fix checkstyle issues
---
 .../org/apache/camel/component/pulsar/PulsarProducer.java    |  4 ++--
 .../component/pulsar/configuration/PulsarConfiguration.java  | 12 +++++++-----
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index 140f0c7..2b4eaf3 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
@@ -26,8 +28,6 @@ import org.apache.camel.support.DefaultProducer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 
-import java.util.concurrent.TimeUnit;
-
 public class PulsarProducer extends DefaultProducer {
 
     private final PulsarEndpoint pulsarEndpoint;
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
index b2099d3..388460c 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/configuration/PulsarConfiguration.java
@@ -16,13 +16,13 @@
  */
 package org.apache.camel.component.pulsar.configuration;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.component.pulsar.utils.consumers.SubscriptionType;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.pulsar.client.api.CompressionType;
 
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.camel.component.pulsar.utils.consumers.SubscriptionType.EXCLUSIVE;
 
 @UriParams
@@ -51,10 +51,12 @@ public class PulsarConfiguration {
     @UriParam(label = "producer", description = "Send timeout in milliseconds", defaultValue = "30000")
     private int sendTimeoutMs = 30000;
     @UriParam(label = "producer", description = "Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError", defaultValue = "false")
-    private boolean blockIfQueueFull = false;
-    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true", defaultValue = "1000")
+    private boolean blockIfQueueFull;
+    @UriParam(label = "producer", description = "Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true",
+            defaultValue = "1000")
     private int maxPendingMessages = 1000;
-    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
+    @UriParam(label = "producer", description = "The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if "
+            + "(number of partitions * maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition.", defaultValue = "50000")
     private int maxPendingMessagesAcrossPartitions = 50000;
     @UriParam(label = "producer", description = "The maximum time period within which the messages sent will be batched if batchingEnabled is true.", defaultValue = "1000")
     private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(1);

Reply | Threaded
Open this post in threaded view
|

[camel] 04/04: Fix merge conflicts

davsclaus-2
In reply to this post by davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3a82e8f6fea7b3ba1d6009524626a2fac78561b2
Author: Claus Ibsen <[hidden email]>
AuthorDate: Wed Aug 14 15:20:00 2019 +0200

    Fix merge conflicts
---
 .../pulsar/PulsarConsumerAcknowledgementTest.java       |  2 +-
 .../pulsar/PulsarConsumerNoAcknowledgementTest.java     |  2 +-
 .../pulsar/PulsarCustomMessageReceiptTest.java          | 17 +++++++++--------
 .../pulsar/PulsarNegativeAcknowledgementTest.java       |  4 ++--
 4 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
index 5f45684..0ce2dcc 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerAcknowledgementTest.java
@@ -53,7 +53,7 @@ public class PulsarConsumerAcknowledgementTest extends PulsarTestSupport {
     @EndpointInject(uri = "mock:result")
     private MockEndpoint to;
 
-    Producer<String> producer;
+    private Producer<String> producer;
 
     @Before
     public void setup() throws Exception {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
index 6f68fc1..1cb5527 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarConsumerNoAcknowledgementTest.java
@@ -17,7 +17,7 @@
 package org.apache.camel.component.pulsar;
 
 import java.util.concurrent.TimeUnit;
-import org.apache.camel.CamelContext;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
index e579b59..24a9132 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarCustomMessageReceiptTest.java
@@ -16,14 +16,8 @@
  */
 package org.apache.camel.component.pulsar;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
 import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
@@ -41,6 +35,13 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
 public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(PulsarCustomMessageReceiptTest.class);
@@ -63,7 +64,7 @@ public class PulsarCustomMessageReceiptTest extends PulsarTestSupport {
     @EndpointInject(uri = "mock:result")
     private MockEndpoint to;
 
-    Producer<String> producer;
+    private Producer<String> producer;
 
     @Before
     public void setup() throws Exception {
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
index ebee09e..abb76a0 100644
--- a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarNegativeAcknowledgementTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.camel.component.pulsar;
 
-import static org.mockito.Mockito.mock;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.junit.Test;
 
+import static org.mockito.Mockito.mock;
+
 public class PulsarNegativeAcknowledgementTest {
 
     @Test(expected = UnsupportedOperationException.class)