[camel] branch master updated: CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields (#3340)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated: CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields (#3340)

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new e8e3b57  CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields (#3340)
e8e3b57 is described below

commit e8e3b57b4d51f4550e23e8d19f31c25697ee205c
Author: William Thompson <[hidden email]>
AuthorDate: Thu Nov 14 23:55:49 2019 -0500

    CAMEL-14184: Allow setting Pulsar Message properties, event_time, key fields (#3340)
   
    * Configurable Pulsar Message through Camel Exchange headers
   
    * Fix docs typos
   
    * Add missing license text
   
    * Remove unused import
---
 .../src/main/docs/pulsar-component.adoc            |  31 +++++
 .../camel/component/pulsar/PulsarProducer.java     |  28 ++++-
 .../pulsar/utils/message/PulsarMessageHeaders.java |   3 +
 .../pulsar/PulsarProducerHeadersInTest.java        | 127 +++++++++++++++++++++
 4 files changed, 187 insertions(+), 2 deletions(-)

diff --git a/components/camel-pulsar/src/main/docs/pulsar-component.adoc b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
index e763f6b..01b5bee 100644
--- a/components/camel-pulsar/src/main/docs/pulsar-component.adoc
+++ b/components/camel-pulsar/src/main/docs/pulsar-component.adoc
@@ -147,3 +147,34 @@ The component supports 8 options, which are listed below.
 | *camel.component.pulsar.pulsar-message-receipt-factory* | Provide a factory to create an alternate implementation of PulsarMessageReceipt. The option is a org.apache.camel.component.pulsar.PulsarMessageReceiptFactory type. |  | String
 |===
 // spring-boot-auto-configure options: END
+
+// message-headers options: START
+=== Message headers evaluated by the Pulsar producer
+
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+| Header | Type | Description
+| `CamelPulsarProducerMessageKey` | `String` | Sets the key on the message for the Pulsar routing policy
+| `CamelPulsarProducerMessageProperties` | `Map<String,String>` | The properties to set on the Pulsar message
+| `CamelPulsarProducerEventTime` | `long` | Sets the event time on the message
+|===
+
+=== Message headers set by the Pulsar consumer
+
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|===
+| Header | Type | Description
+| `properties` | `Map<String,String>` | The properties from the Pulsar message or the empty Map if unset on the Pulsar message
+| `producer_name` | `String` | The name of the producer that created the message
+| `sequence_id` | `long` | Sequence identifier of the Pulsar message
+| `publish_time` | `long` | Time the Pulsar message was published to the topic
+| `message_id` | `MessageId` | Unique identifier of the message
+| `event_time` | `long` | The event time associated with the message or 0 if unset on the Pulsar message
+| `key` | `String` | The key of the Pulsar message in String form or the empty string if unset on the Pulsar message
+| `key_bytes` | `byte[]` | The bytes in the key. If the key has been base64 encoded, it is decoded before being returned. Otherwise, if the key is a plain string, the UTF-8 encoded bytes of the string.
+| `topic_name` | `String` | The topic to which the message was published
+| `manual_acknowledgement` | `PulsarManualAcknowledgement` | If allowManualAcknowledgement is set, this will contain the object for manually acknowledging the Pulsar message; otherwise it is unset
+|===
+// message-headers options: END
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
index efc78ba..4db08ba 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.pulsar;
 
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
@@ -23,11 +24,14 @@ import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConversionException;
 import org.apache.camel.component.pulsar.configuration.PulsarConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
 import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 
 public class PulsarProducer extends DefaultProducer {
 
@@ -42,14 +46,34 @@ public class PulsarProducer extends DefaultProducer {
     @Override
     public void process(final Exchange exchange) throws Exception {
         final Message message = exchange.getIn();
+
+        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
         byte[] body;
         try {
-            body = exchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, exchange, message.getBody());
+            body = exchange.getContext().getTypeConverter()
+                    .mandatoryConvertTo(byte[].class, exchange, message.getBody());
         } catch (NoTypeConversionAvailableException | TypeConversionException exception) {
             // fallback to try serialize the data
             body = PulsarMessageUtils.serialize(message.getBody());
         }
-        producer.send(body);
+        messageBuilder.value(body);
+
+        String key = exchange.getIn().getHeader(PulsarMessageHeaders.KEY_OUT, String.class);
+        if (ObjectHelper.isNotEmpty(key)) {
+            messageBuilder.key(key);
+        }
+
+        Map<String, String> properties = CastUtils.cast(exchange.getIn().getHeader(PulsarMessageHeaders.PROPERTIES_OUT, Map.class));
+        if (ObjectHelper.isNotEmpty(properties)) {
+            messageBuilder.properties(properties);
+        }
+
+        Long eventTime = exchange.getIn().getHeader(PulsarMessageHeaders.EVENT_TIME_OUT, Long.class);
+        if (eventTime != null) {
+            messageBuilder.eventTime(eventTime);
+        }
+
+        messageBuilder.send();
     }
 
     private synchronized void createProducer() throws org.apache.pulsar.client.api.PulsarClientException {
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
index a983564..978d78b 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/utils/message/PulsarMessageHeaders.java
@@ -28,4 +28,7 @@ public interface PulsarMessageHeaders {
     String KEY_BYTES = "key_bytes";
     String TOPIC_NAME = "topic_name";
     String MESSAGE_RECEIPT = "message_receipt";
+    String KEY_OUT = "CamelPulsarProducerMessageKey";
+    String PROPERTIES_OUT = "CamelPulsarProducerMessageProperties";
+    String EVENT_TIME_OUT = "CamelPulsarProducerMessageEventTime";
 }
diff --git a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
new file mode 100644
index 0000000..21fd49c
--- /dev/null
+++ b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/PulsarProducerHeadersInTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.pulsar;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.component.pulsar.utils.AutoConfiguration;
+import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
+import org.apache.camel.spi.Registry;
+import org.apache.camel.support.SimpleRegistry;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.junit.Test;
+
+public class PulsarProducerHeadersInTest extends PulsarTestSupport {
+
+    private static final String TOPIC_URI = "persistent://public/default/camel-producer-topic";
+    private static final String PRODUCER = "camel-producer";
+
+    @Produce("direct:start")
+    private ProducerTemplate producerTemplate;
+
+    @EndpointInject("pulsar:" + TOPIC_URI
+            + "?numberOfConsumers=1&subscriptionType=Exclusive"
+            + "&subscriptionName=camel-subscription"
+            + "&consumerQueueSize=1"
+            + "&consumerName=camel-consumer"
+            + "&producerName=" + PRODUCER
+    )
+    private Endpoint pulsar;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint mock;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from("direct:start").to(pulsar);
+                from(pulsar).to(mock);
+            }
+        };
+    }
+
+    @Override
+    protected Registry createCamelRegistry() throws Exception {
+        Registry registry = new SimpleRegistry();
+
+        registerPulsarBeans(registry);
+
+        return registry;
+    }
+
+    private void registerPulsarBeans(final Registry registry) throws PulsarClientException {
+        PulsarClient pulsarClient = givenPulsarClient();
+        AutoConfiguration autoConfiguration = new AutoConfiguration(null, null);
+
+        registry.bind("pulsarClient", pulsarClient);
+        PulsarComponent comp = new PulsarComponent(context);
+        comp.setAutoConfiguration(autoConfiguration);
+        comp.setPulsarClient(pulsarClient);
+        registry.bind("pulsar", comp);
+    }
+
+    private PulsarClient givenPulsarClient() throws PulsarClientException {
+        return new ClientBuilderImpl()
+                .serviceUrl(getPulsarBrokerUrl())
+                .ioThreads(1)
+                .listenerThreads(1)
+                .build();
+    }
+
+    @Test
+    public void propertyHeaderSetsPulsarProperties() throws InterruptedException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("testProperty", "testValue");
+        mock.expectedHeaderReceived(PulsarMessageHeaders.PROPERTIES, properties);
+
+        producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.PROPERTIES_OUT, properties);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+
+    @Test
+    public void eventTimeHeaderSetsPulsarEventTime() throws InterruptedException {
+        long eventTime = 10000;
+        mock.expectedHeaderReceived(PulsarMessageHeaders.EVENT_TIME, eventTime);
+
+        producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.EVENT_TIME_OUT, eventTime);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+
+    @Test
+    public void keyHeaderSetsPulsarKey() throws InterruptedException {
+        String key = "testKey";
+        mock.expectedHeaderReceived(PulsarMessageHeaders.KEY, key);
+
+        producerTemplate.sendBodyAndHeader("test", PulsarMessageHeaders.KEY_OUT, key);
+
+        MockEndpoint.assertIsSatisfied(10, TimeUnit.SECONDS, mock);
+    }
+}