Route shutdown does not fully cleanup

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Route shutdown does not fully cleanup

peter.berkman
Camel version: 2.13.3

I have per route configuration that I monitor and if it changes I need to fully shutdown the route and recreate it.

The problem is that the route stop() then remove() doesn't fully shut everything down and the context still believes it has declared routes that it should not.

first, the context issue:  After full stop() and remove() on all the routes in the context have finished, when I try and redo the routes, I get the following message when configuring the error handlers I get:
java.lang.IllegalArgumentException: errorHandler must be defined before any routes in the RouteBuilder

(note, there are zero defined routes left in the context getContext().getRoutes() returns empty)

second, on route startup (after a successfuly route stop() remove()) when the route's endpoint has a resource (like mina2), the resource is still bound and the startup fails.

Exception when trying to startup the route after successful shutdown:

20150409 12:39:14.905 [WARN ] NGMS-RouteStopper-ngms-hl7v2mllplistener-context | 317:com.nextgate.ms.components.bundles.ngms-standard-library | com.nextgate.ms.bundlelib.interfaces.NGMSRouteBuilder | unable to start route [ngms-hl7v2mllp-listener-query-route].  Exception thrown: java.net.BindException: Address already in use: bind
java.net.BindException: Address already in use: bind
	at sun.nio.ch.Net.bind0(Native Method)
	at sun.nio.ch.Net.bind(Net.java:444)
	at sun.nio.ch.Net.bind(Net.java:436)
	at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
	at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
	at org.apache.mina.transport.socket.nio.NioSocketAcceptor.open(NioSocketAcceptor.java:198)

Please let me know what I'm missing....

Here are the various snippets of relevant code...

The restartRoute method:

    /**
     * stop existing routes if any and calls loadRoutes.
     */
    protected void restartRoutes() {
        
        //
        //  stop existing routes - need to thread because we likely got triggered by an inbound message,
        //  so, we are on the same thread and we need to let it process.
        //
        
        final String contextName = getContext().getName();
        final NGMSRouteBuilder me = this;
        final Map<String, RouteDefinition> stopRoutes = new HashMap<String, RouteDefinition>(this.routes);

        LOG.info("Configuration changed triggered a context restart for: {}", contextName);

        //
        //  clear in preparation for refresh.
        //
        
        this.routes.clear();

        Thread stopper = new Thread() {
            
            @Override
            public void run() {
                
                LOG.info("Stopping Routes on Context: {}", contextName);
                
                for (Entry<String, RouteDefinition> existRoute : stopRoutes.entrySet()) {
                    
                    if (getContext().getRouteStatus(existRoute.getKey()).isStarted()) {
                        
                        //
                        //  stop endpoint
                        //
                        
                        try {
                            Endpoint ep = getContext().getRoute(existRoute.getKey()).getEndpoint();
                            
                            if (ep != null) {
                                
                                ep.stop();
                            }
                        }
                        catch (Exception e) {
                            
                            LOG.warn("unable to stop endpoint for [{}].  {}", existRoute.getKey(), ExceptionUtil.getExceptionDetails(e));
                        }

                        //
                        //  stop route
                        //
                        
                        try {
                            getContext().stopRoute(existRoute.getKey());
                        }
                        catch (Exception e) {
                            
                            LOG.warn("unable to stop route [{}].  {}", existRoute.getKey(), ExceptionUtil.getExceptionDetails(e));
                        }

                        //
                        //  remove the route
                        //
                        
                        try {
                            getContext().removeRoute(existRoute.getKey());
                        }
                        catch (Exception e) {
                            
                            LOG.warn("unable to remove route [{}].  {}", existRoute.getKey(), ExceptionUtil.getExceptionDetails(e));
                        }
                    }
                }
                
                LOG.debug("Stopping Routes on Context [end]: {}", contextName);

                //
                //  start routes back up again...
                //
                
                try {
                    me.loadRoutes(true);
                }
                catch (Exception e) {
                    
                    LOG.error("unable to load routes for [{}]. {}", contextName, ExceptionUtil.getExceptionDetails(e));
                }
            }
        };
                
        stopper.setName("NGMS-RouteStopper-" + contextName);
        
        stopper.start();
    }

The Mina2 listener route:

            Mina2Endpoint minaFrom = (Mina2Endpoint) minaComp.createEndpoint(minaCfg);
            
            LOG.info("Defining route [{}] with target queue [{}] and acknowledgement mode: {}", routeName, targetQName, ackMode);

            final boolean autoStart = ConfigurationCache.getEntry(instanceName).getRoutingSettings().isPolicyListnerRouteAutostart();

                route = (RouteDefinition) from(minaFrom).routeId(routeName).routePolicy(throttlePolicy).autoStartup(autoStart)
                    
                        .onCompletion()
                            .setHeader(ComponentRegistry.HEADERTAG_COMPONENTID, constant(compid)).id(NGMSRouteBuilder.SETCONFIGTAGCOMPID_IDSTR + "ret_" + instanceName)
                            .setHeader(ComponentRegistry.HEADERTAG_COMPONENTINSTANCE, constant(instanceName)).id(NGMSRouteBuilder.SETCONFIGTAGCOMPINST_IDSTR + "ret_" + instanceName)
                            .process(MessageTrackingFactory.getAdapterFromSourceCompletion()).id(NGMSRouteBuilder.TRACKINGEND_IDSTR + instanceName)
                        .end()
                    
                        .setHeader(CMFactory.HEADERTAG, constant(instanceName)).id(NGMSRouteBuilder.SETCONFIGTAGHEADER_IDSTR + instanceName)
                        .setHeader(ComponentRegistry.HEADERTAG_COMPONENTID, constant(compid)).id(NGMSRouteBuilder.SETCONFIGTAGCOMPID_IDSTR + instanceName)
                        
                        .unmarshal(hl7).id(NGMSRouteBuilder.UNMARSHAL_IDSTR + instanceName)
                        .process(MessageTrackingFactory.getAdapterFromSource()).id(NGMSRouteBuilder.TRACKING_IDSTR + instanceName)
                        .process(HooksFactory.getPostAdapterHook()).id(NGMSRouteBuilder.POSTADAPTERHOOK_IDSTR + instanceName)
                        .process(MessageTrackingFactory.getSetNGMSMessageInExchEnv()).id(NGMSRouteBuilder.SETCONFIGTAGNGMSMSG_IDSTR + instanceName)
                        .to(NGMSConstants.QUEUE_PREFIX + targetQName).id(targetQName + "_" + instanceName);

The error handler setup code:

        //
        //  not working...  force it...  the getRoutes() actually returns none after shutdown...
        //
        //      List<Route> croutes = getContext().getRoutes();
        //      if ((croutes == null) || (croutes.isEmpty())) {
        //
        if (!(this.errorHandlerInitialized)) {
            
            this.errorHandlerInitialized = true;
            
            //
            //  unfortunately, Camel has a bug in the removeRoute where it does not correctly
            //  re-initialize the context, so the context still thinks it has routes defined.  But,
            //  the getRoutes() returns none???
            //  
            //  and, Camel will not let you define an error handler when it thinks there
            //  are outstanding routes defined.
            //
            //  java.lang.IllegalArgumentException: errorHandler must be defined before any routes in the RouteBuilder
            //

            LOG.info("Error Policy-Max redeliveries: {}", retries);
            LOG.info("Error Policy-Redelivery delay: {}", redeliveryDelay);
            LOG.info("Error Policy-Max redelivery delay: {}", maximumRedeliveryDelay);
            LOG.info("Error Policy-Redelivery backoff multiplier: {}", backoffMultiplier);
            
            errorHandler(defaultErrorHandler().maximumRedeliveries(retries)
                                              .redeliveryDelay(redeliveryDelay)
                                              .maximumRedeliveryDelay(maximumRedeliveryDelay)
                                              .log(this.getClass().getName())
                                              .backOffMultiplier(backoffMultiplier)
                                              .retriesExhaustedLogLevel(LoggingLevel.ERROR)
                                              .retryAttemptedLogLevel(LoggingLevel.DEBUG));
        }
        else {
            
            LOG.info("Error Policy-Not changing existing values.  if these need to change, you will need to restart the effected bundle ({})", getContext().getName());
        }