publish/subscribe with camel-netty4

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

publish/subscribe with camel-netty4

jjansen
Hi everybody,

I have to create an interface which should communicate to a server over tcp/ip.
After an initial registration request, the server will publish all upcoming events on the same socket (and channel).

My first idea was to create two routes, one for the request the other one acting as a listener:

        String serverUri = "netty4:tcp://localhost:30600?clientMode=true&decoders=#length-decoder,#string-decoder&encoders=#length-encoder,#string-encoder";

        from("direct:start")
          .setBody(constant("registrationBody"))
          .to(ExchangePattern.OutOnly, serverUri)
        ;

        from(serverUri)
          .process(exchange -> System.out.println("Incoming message!"))
          .to("file:test/output?fileName=updates.xml&fileExist=Append")
        ;

This solution does not work, because each route establish a separate connection on a different channel.
Does anyone know how to configure such a communication?

My limitation is, that I'm working on apache-servicemix 7.0.0 with camel 2.16.4.

Thanks for your help,
Jörg
Reply | Threaded
Open this post in threaded view
|

Re: publish/subscribe with camel-netty4

jjansen
In case after an inital registration call, the server sends an event, I can see that the message count has been incremented (and the InFlight count has been decreased).

karaf@root>route-list
 Context            Route                   Status              Total #       Failed #     Inflight #   Uptime
 -------            -----                   ------              -------       --------     ----------   ------
 gs-provider        gs-provider-listener    Started                   0              0              0   4 minutes
 gs-provider        gs-registration-route   Started                   2              0             -1   4 minutes
karaf@root>

karaf@root>route-list
 Context            Route                   Status              Total #       Failed #     Inflight #   Uptime
 -------            -----                   ------              -------       --------     ----------   ------
 gs-provider        gs-provider-listener    Started                   0              0              0   4 minutes
 gs-provider        gs-registration-route   Started                   3              0             -2   4 minutes
karaf@root>

Is there any possibilty to catch the event somehow?

The route looks like this:
   
    String rtServerUri = stringBuilder.toString();
   
    from("quartz://gs/provider?trigger.repeatCount=0&trigger.repeatInterval=15000")
      .id("gs-registration-route").routeId("gs-registration-route")
      .setBody(constant(registrationBody))
      .to("netty4:tcp://localhost:30600?clientMode=true&decoders=#length-decoder,#string-decoder&encoders=#length-encoder,#string-encoder&lazyChannelCreation=false")
      .process(exchange -> System.out.println(exchange.getIn().getBody(String.class)))
    ;
Reply | Threaded
Open this post in threaded view
|

Re: publish/subscribe with camel-netty4

jjansen
I found a way, which seems to work for me.

The solution was to create a custom component which extends the NettyComponent.
The corresponing endpoint will have an reference to the used listener channel (Consumer).

@Override
protected void doStart() throws Exception
{
  super.doStart();
 
  Field nettyServerBootstrapFactory = GsPblConsumer.class.getSuperclass().getDeclaredField("nettyServerBootstrapFactory");
  nettyServerBootstrapFactory.setAccessible(true);
  Object nsbf = nettyServerBootstrapFactory.get(this);
  Field channel = nsbf.getClass().getDeclaredField("channel");
  channel.setAccessible(true);
  channelToShare = (Channel) channel.get(nsbf);
  getEndpoint().setSharedChannel(channelToShare);
}

This channel will be used for the Producer to send the messages.

@Override
public boolean process(Exchange exchange, AsyncCallback callback)
{
  ...
  // get a channel from the pool
  Channel existing;
  try {
    if (getConfiguration().isShareChannel() && getEndpoint().hasChannelToShare())
      existing = getEndpoint().getSharedChannel();
    else
      existing = pool.borrowObject();
    if (existing != null) {
      LOG.trace("Got channel from pool {}", existing);
    }
  } catch (Exception e) {
    exchange.setException(e);
    callback.done(true);
    return true;
  }

  ...
}

Does anybody know a better solution?

Thanks,
Jörg
Reply | Threaded
Open this post in threaded view
|

publish/subscribe with camel-netty4

jjansen
I found a way, which seems to work for me.

The solution was to create a custom component which extends the NettyComponent.
The corresponing endpoint will have an reference to the used listener channel (Consumer).

@Override
protected void doStart() throws Exception
{
  super.doStart();

  Field nettyServerBootstrapFactory = GsPblConsumer.class.getSuperclass().getDeclaredField("nettyServerBootstrapFactory");
  nettyServerBootstrapFactory.setAccessible(true);
  Object nsbf = nettyServerBootstrapFactory.get(this);
  Field channel = nsbf.getClass().getDeclaredField("channel");
  channel.setAccessible(true);
  channelToShare = (Channel) channel.get(nsbf);
  getEndpoint().setSharedChannel(channelToShare);
}

This channel will be used for the Producer to send the messages.

@Override
public boolean process(Exchange exchange, AsyncCallback callback)
{
  ...
  // get a channel from the pool
  Channel existing;
  try {
    if (getConfiguration().isShareChannel() && getEndpoint().hasChannelToShare())
      existing = getEndpoint().getSharedChannel();
    else
      existing = pool.borrowObject();
    if (existing != null) {
      LOG.trace("Got channel from pool {}", existing);
    }
  } catch (Exception e) {
    exchange.setException(e);
    callback.done(true);
    return true;
  }

  ...
}

Does anybody know a better solution?

Thanks,
Jörg

________________________________
If you reply to this email, your message will be added to the discussion below:
http://camel.465427.n5.nabble.com/publish-subscribe-with-camel-netty4-tp5801431p5802787.html
To unsubscribe from publish/subscribe with camel-netty4, click here<
NAML<
http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>