[camel] branch master updated: CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated: CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868)

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ade6d  CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868)
f3ade6d is described below

commit f3ade6db2f9cd4f6f23371848fe29c35774fd835
Author: Zineb BENDHIBA <[hidden email]>
AuthorDate: Thu Jan 14 14:04:49 2021 +0100

    CAMEL-16018: HazelcastReplicatedConsumer not receiving events (#4868)
---
 .../hazelcast/HazelcastComponentHelper.java        |  4 +-
 .../HazelcastAtomicnumberProducer.java             | 10 ++--
 .../hazelcast/list/HazelcastListProducer.java      |  2 +-
 .../hazelcast/map/HazelcastMapProducer.java        | 16 +++---
 .../multimap/HazelcastMultimapProducer.java        |  8 +--
 .../hazelcast/queue/HazelcastQueueProducer.java    | 14 ++---
 .../HazelcastReplicatedmapConsumer.java            |  2 +-
 .../ringbuffer/HazelcastRingbufferProducer.java    | 14 ++---
 .../hazelcast/HazelcastCamelTestSupport.java       |  2 +-
 .../HazelcastReplicatedmapConsumerTest.java        | 64 +++++++++-------------
 10 files changed, 63 insertions(+), 73 deletions(-)

diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
index e3139a9..4683c61 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastComponentHelper.java
@@ -41,8 +41,8 @@ public final class HazelcastComponentHelper {
         }
 
         // propagate headers if OUT message created
-        if (ex.hasOut()) {
-            ex.getOut().setHeaders(headers);
+        if (ex.getMessage() != null) {
+            ex.getMessage().setHeaders(headers);
         }
     }
 
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
index bea9f87..5761f39 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/atomicnumber/HazelcastAtomicnumberProducer.java
@@ -92,15 +92,15 @@ public class HazelcastAtomicnumberProducer extends HazelcastDefaultProducer {
     }
 
     private void get(Exchange exchange) {
-        exchange.getOut().setBody(this.atomicnumber.get());
+        exchange.getMessage().setBody(this.atomicnumber.get());
     }
 
     private void increment(Exchange exchange) {
-        exchange.getOut().setBody(this.atomicnumber.incrementAndGet());
+        exchange.getMessage().setBody(this.atomicnumber.incrementAndGet());
     }
 
     private void decrement(Exchange exchange) {
-        exchange.getOut().setBody(this.atomicnumber.decrementAndGet());
+        exchange.getMessage().setBody(this.atomicnumber.decrementAndGet());
     }
 
     private void compare(long expected, Exchange exchange) {
@@ -108,12 +108,12 @@ public class HazelcastAtomicnumberProducer extends HazelcastDefaultProducer {
         if (ObjectHelper.isEmpty(expected)) {
             throw new IllegalArgumentException("Expected value must be specified");
         }
-        exchange.getOut().setBody(this.atomicnumber.compareAndSet(expected, update));
+        exchange.getMessage().setBody(this.atomicnumber.compareAndSet(expected, update));
     }
 
     private void getAndAdd(Exchange exchange) {
         long delta = exchange.getIn().getBody(Long.class);
-        exchange.getOut().setBody(this.atomicnumber.getAndAdd(delta));
+        exchange.getMessage().setBody(this.atomicnumber.getAndAdd(delta));
     }
 
     private void set(Exchange exchange) {
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
index db57ab6..8044083 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListProducer.java
@@ -114,7 +114,7 @@ public class HazelcastListProducer extends HazelcastDefaultProducer {
     }
 
     private void get(Integer pos, Exchange exchange) {
-        exchange.getOut().setBody(this.list.get(pos));
+        exchange.getMessage().setBody(this.list.get(pos));
     }
 
     private void set(Integer pos, Exchange exchange) {
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
index 6ad84f9..a54246a 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapProducer.java
@@ -128,7 +128,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
                 break;
 
             case CLEAR:
-                this.clear(exchange);
+                this.clear();
                 break;
 
             case EVICT:
@@ -159,7 +159,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
         } else {
             result = this.cache.values();
         }
-        exchange.getOut().setBody(result);
+        exchange.getMessage().setBody(result);
     }
 
     /**
@@ -193,14 +193,14 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
      * find an object by the given id and give it back
      */
     private void get(Object oid, Exchange exchange) {
-        exchange.getOut().setBody(this.cache.get(oid));
+        exchange.getMessage().setBody(this.cache.get(oid));
     }
 
     /**
      * GET All objects and give it back
      */
     private void getAll(Object oid, Exchange exchange) {
-        exchange.getOut().setBody(this.cache.getAll((Set<Object>) oid));
+        exchange.getMessage().setBody(this.cache.getAll((Set<Object>) oid));
     }
 
     /**
@@ -239,7 +239,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
     /**
      * Clear all the entries
      */
-    private void clear(Exchange exchange) {
+    private void clear() {
         this.cache.clear();
     }
 
@@ -261,7 +261,7 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
      * Check for a specific key in the cache and return true if it exists or false otherwise
      */
     private void containsKey(Object oid, Exchange exchange) {
-        exchange.getOut().setBody(this.cache.containsKey(oid));
+        exchange.getMessage().setBody(this.cache.containsKey(oid));
     }
 
     /**
@@ -269,13 +269,13 @@ public class HazelcastMapProducer extends HazelcastDefaultProducer {
      */
     private void containsValue(Exchange exchange) {
         Object body = exchange.getIn().getBody();
-        exchange.getOut().setBody(this.cache.containsValue(body));
+        exchange.getMessage().setBody(this.cache.containsValue(body));
     }
 
     /**
      * GET keys set of objects and give it back
      */
     private void getKeys(Exchange exchange) {
-        exchange.getOut().setBody(this.cache.keySet());
+        exchange.getMessage().setBody(this.cache.keySet());
     }
 }
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
index f736591..386dc51 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapProducer.java
@@ -99,7 +99,7 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer {
     }
 
     private void get(Object oid, Exchange exchange) {
-        exchange.getOut().setBody(this.cache.get(oid));
+        exchange.getMessage().setBody(this.cache.get(oid));
     }
 
     private void delete(Object oid) {
@@ -111,7 +111,7 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer {
     }
 
     private void valuecount(Object oid, Exchange exchange) {
-        exchange.getOut().setBody(this.cache.valueCount(oid));
+        exchange.getMessage().setBody(this.cache.valueCount(oid));
     }
 
     private void clear(Exchange exchange) {
@@ -119,11 +119,11 @@ public class HazelcastMultimapProducer extends HazelcastDefaultProducer {
     }
 
     private void containsKey(Object oid, Exchange exchange) {
-        exchange.getOut().setBody(this.cache.containsKey(oid));
+        exchange.getMessage().setBody(this.cache.containsKey(oid));
     }
 
     private void containsValue(Exchange exchange) {
         Object body = exchange.getIn().getBody();
-        exchange.getOut().setBody(this.cache.containsValue(body));
+        exchange.getMessage().setBody(this.cache.containsValue(body));
     }
 }
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
index 2e86417..3113acf 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueProducer.java
@@ -127,11 +127,11 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer {
     }
 
     private void poll(Exchange exchange) {
-        exchange.getOut().setBody(this.queue.poll());
+        exchange.getMessage().setBody(this.queue.poll());
     }
 
     private void peek(Exchange exchange) {
-        exchange.getOut().setBody(this.queue.peek());
+        exchange.getMessage().setBody(this.queue.peek());
     }
 
     private void offer(Exchange exchange) {
@@ -149,12 +149,12 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer {
     }
 
     private void remainingCapacity(Exchange exchange) {
-        exchange.getOut().setBody(this.queue.remainingCapacity());
+        exchange.getMessage().setBody(this.queue.remainingCapacity());
     }
 
     private void drainTo(Collection c, Exchange exchange) {
-        exchange.getOut().setBody(this.queue.drainTo(c));
-        exchange.getOut().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
+        exchange.getMessage().setBody(this.queue.drainTo(c));
+        exchange.getMessage().setHeader(HazelcastConstants.DRAIN_TO_COLLECTION, c);
     }
 
     private void removeAll(Exchange exchange) {
@@ -164,11 +164,11 @@ public class HazelcastQueueProducer extends HazelcastDefaultProducer {
 
     private void removeIf(Exchange exchange) {
         Predicate filter = exchange.getIn().getBody(Predicate.class);
-        exchange.getOut().setBody(this.queue.removeIf(filter));
+        exchange.getMessage().setBody(this.queue.removeIf(filter));
     }
 
     private void take(Exchange exchange) throws InterruptedException {
-        exchange.getOut().setBody(this.queue.take());
+        exchange.getMessage().setBody(this.queue.take());
     }
 
     private void retainAll(Exchange exchange) {
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
index 796d2a5..2a215fd 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -45,7 +45,7 @@ public class HazelcastReplicatedmapConsumer extends HazelcastDefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        listener = cache.addEntryListener(new CamelEntryListener(this, cacheName), true);
+        listener = cache.addEntryListener(new CamelEntryListener(this, cacheName));
     }
 
     /**
diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
index e649f31..528d873 100644
--- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
+++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/ringbuffer/HazelcastRingbufferProducer.java
@@ -73,23 +73,23 @@ public class HazelcastRingbufferProducer extends HazelcastDefaultProducer {
     }
 
     private void readOnceHead(Exchange exchange) throws InterruptedException {
-        exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.headSequence()));
+        exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.headSequence()));
     }
 
     private void readOnceTail(Exchange exchange) throws InterruptedException {
-        exchange.getOut().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence()));
+        exchange.getMessage().setBody(this.ringbuffer.readOne(ringbuffer.tailSequence()));
     }
 
-    private void getCapacity(Exchange exchange) throws InterruptedException {
-        exchange.getOut().setBody(this.ringbuffer.capacity());
+    private void getCapacity(Exchange exchange) {
+        exchange.getMessage().setBody(this.ringbuffer.capacity());
     }
 
-    private void getRemainingCapacity(Exchange exchange) throws InterruptedException {
-        exchange.getOut().setBody(this.ringbuffer.remainingCapacity());
+    private void getRemainingCapacity(Exchange exchange) {
+        exchange.getMessage().setBody(this.ringbuffer.remainingCapacity());
     }
 
     private void add(Exchange exchange) {
         final Object body = exchange.getIn().getBody();
-        exchange.getOut().setBody(ringbuffer.add(body));
+        exchange.getMessage().setBody(ringbuffer.add(body));
     }
 }
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
index aed1630..c3b1df7 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastCamelTestSupport.java
@@ -32,7 +32,7 @@ public class HazelcastCamelTestSupport extends CamelTestSupport {
 
     @Override
     protected CamelContext createCamelContext() throws Exception {
-        MockitoAnnotations.initMocks(this);
+        MockitoAnnotations.openMocks(this);
         CamelContext context = super.createCamelContext();
         HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance);
         trainHazelcastInstance(hazelcastInstance);
diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
index 74c854d..6a699ab 100644
--- a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
+++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastReplicatedmapConsumerTest.java
@@ -17,47 +17,45 @@
 package org.apache.camel.component.hazelcast;
 
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryEventType;
-import com.hazelcast.core.EntryListener;
+import com.hazelcast.core.Hazelcast;
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.replicatedmap.ReplicatedMap;
+import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSupport {
+public class HazelcastReplicatedmapConsumerTest extends CamelTestSupport {
 
-    @Mock
+    private HazelcastInstance hazelcastInstance;
     private ReplicatedMap<Object, Object> map;
 
-    @Captor
-    private ArgumentCaptor<EntryListener<Object, Object>> argument;
+    @BeforeEach
+    public void beforeEach() {
+        hazelcastInstance = Hazelcast.newHazelcastInstance();
+        map = hazelcastInstance.getReplicatedMap("rm");
+    }
 
-    @Override
-    protected void trainHazelcastInstance(HazelcastInstance hazelcastInstance) {
-        when(hazelcastInstance.getReplicatedMap("rm")).thenReturn(map);
-        when(map.addEntryListener(any(), eq(true))).thenReturn(UUID.randomUUID());
+    @AfterEach
+    public void afterEach() {
+        if (hazelcastInstance != null) {
+            hazelcastInstance.shutdown();
+        }
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    protected void verifyHazelcastInstance(HazelcastInstance hazelcastInstance) {
-        verify(hazelcastInstance).getReplicatedMap("rm");
-        verify(map).addEntryListener(any(EntryListener.class), eq(true));
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        HazelcastCamelTestHelper.registerHazelcastComponents(context, hazelcastInstance);
+        return context;
     }
 
     @Test
@@ -65,10 +63,7 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
         MockEndpoint out = getMockEndpoint("mock:added");
         out.expectedMessageCount(1);
 
-        verify(map).addEntryListener(argument.capture(), eq(true));
-        EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.ADDED.getType(), "4711", "my-foo");
-        argument.getValue().entryAdded(event);
-
+        map.put("4711", "my-foo");
         assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
 
         this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.ADDED);
@@ -81,11 +76,8 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
     public void testEvict() throws InterruptedException {
         MockEndpoint out = getMockEndpoint("mock:evicted");
         out.expectedMessageCount(1);
-
-        verify(map).addEntryListener(argument.capture(), eq(true));
-        EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.EVICTED.getType(), "4711", "my-foo");
-        argument.getValue().entryEvicted(event);
-
+        map.put("4711", "my-foo", 100, TimeUnit.MILLISECONDS);
+        Thread.sleep(150);
         assertMockEndpointsSatisfied(30000, TimeUnit.MILLISECONDS);
     }
 
@@ -93,11 +85,8 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
     public void testRemove() throws InterruptedException {
         MockEndpoint out = getMockEndpoint("mock:removed");
         out.expectedMessageCount(1);
-
-        verify(map).addEntryListener(argument.capture(), eq(true));
-        EntryEvent<Object, Object> event = new EntryEvent<>("foo", null, EntryEventType.REMOVED.getType(), "4711", "my-foo");
-        argument.getValue().entryRemoved(event);
-
+        map.put("4711", "my-foo");
+        map.remove("4711");
         assertMockEndpointsSatisfied(5000, TimeUnit.MILLISECONDS);
         this.checkHeaders(out.getExchanges().get(0).getIn().getHeaders(), HazelcastConstants.REMOVED);
     }
@@ -124,4 +113,5 @@ public class HazelcastReplicatedmapConsumerTest extends HazelcastCamelTestSuppor
         assertEquals("4711", headers.get(HazelcastConstants.OBJECT_ID));
         assertNotNull(headers.get(HazelcastConstants.LISTENER_TIME));
     }
+
 }