Issue with consumer in camel-kafka component

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Issue with consumer in camel-kafka component

i.melnik
This post was updated on .
Hi All:
I'm trying camel-kafka component (2.13-SNAPSHOT camel version) and i have some issues when trying to consume messages from kakfa server.

1. Is header "kafka.CONTENT_TYPE" required? In normal java app i've tried to get messages like this:
from(endpoint).to("stream:out?");

My endpoint configured like this:

final KafkaEndpoint endpoint = new KafkaEndpoint("kafka://?", "", component);
endpoint.setTopic("page_visits");
endpoint.setZookeeperHost(host);
endpoint.setZookeeperPort(port);
endpoint.setGroupId("myGroup");

And without defining this header when i produce messages i've got NPE. Looking at the source code i guess it happens here:

public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
        Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());

        Message message = new DefaultMessage();
        ...
        // NPE occurs here
        message.setHeader(KafkaConstants.KEY, new String(mm.key()));
        ...
}

2. I'm not able to get message from my kafka server. After setting header kafka.CONTENT_TYPE and avoiding NPE body of returned exchange is always null.
Looking in the consumer sample described at Consumer Group Example we can see that message obtainde like this:

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
//This is message from kafka server
it.next().message();

And again looking at source code of KafkaEndpoint class message that came from server is ignored.

So are this two points bugs, or any suggestions how to resolve this issue will be appreciated.

Thanks.