camel-netty bug and the need of best practice for creating referenced parameter object on looking-up

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

camel-netty bug and the need of best practice for creating referenced parameter object on looking-up

tenderice

Hi, all

I recently encountered a bug in camel-netty, see my previous post on http://camel.465427.n5.nabble.com/buffer-corruption-in-camel-netty-td5551122.html

The main reason is that camel-netty's DefaultServerPipelineFactory stores decoders/encoders in NettyConfiguration object and use them when a new pipeline is created. This mechanism of creating and using referenced parameter works fine for most camel components, however will cause problem when the netty decoder/encoder is not sharable, that is, the object has state. For example, LengthFieldBasedFrameDecoder and any class that derive from FrameDecoder are all not sharable, one must create a new object each time a pipeline is established, rather than share an instance among all connections.

Although you can write your own PipeLineFactory to avoid the problem (by creating the decoder/encoder each time), however that makes the mechanism of referenced decoders/encoders quite useless, and still there is no way to prevent people from making such mistake.

I currently use a solution by storing the encoder/decoder name in the Configuration object rather than the object instance, and create a new object each time by looking it up from SpringContext, with the help of "prototype" tag in the bean configuration. However, this method is only compatible with Spring ApplicationContextRegistry.

So I think the underlying question is that "is there a best practice for creating referenced object each time when it is used", or, say, "does the configuration model of camel has this kind of ability?" I didn't dig into all components to find the solution, please who can point out a component or give a suggestion for solving this problem, I can make the modification.

Hope I explained clearly.
Reply | Threaded
Open this post in threaded view
|

Re: camel-netty bug and the need of best practice for creating referenced parameter object on looking-up

tenderice
below is a very simple testcase for this problem, due to the random nature of the bug, you can get different result each time you run the program. However, if you create a new decoder in the pipelinefactory each time, you get the correct result.

The idea of the test is: create two endpoint, listening on two port, send data to these ports, first send port of data to port1, the send some thing to port two, then send rest data to port1. we expect that port1 receive all data correctly.
/////////////////////////////////////////////////////////////////////////////////////////////


package org.apache.camel.component.netty;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnsharableCodecsConflictsTest extends BaseNettyTest {
        private static final Logger logger = LoggerFactory
                        .getLogger(UnsharableCodecsConflictsTest.class);

        static final byte[] lengthHeader = { 0x00, 0x00, 0x40, 0x00 }; // 4096 bytes

        private Processor processor = new P();

        @Override
        protected JndiRegistry createRegistry() throws Exception {
                JndiRegistry registry = super.createRegistry();

                // START SNIPPET: registry-beans
                LengthFieldBasedFrameDecoder lengthDecoder = new LengthFieldBasedFrameDecoder(
                                1048576, 0, 4, 0, 4);
                registry.bind("length-decoder", lengthDecoder);

                LengthFieldPrepender lengthEncoder = new LengthFieldPrepender(4);
                registry.bind("length-encoder", lengthEncoder);

                // END SNIPPET: registry-beans
                return registry;
        }

        @Test
        public void canSupplyMultipleCodecsToEndpointPipeline() throws Exception {
                byte[] s8888 = new byte[8192];
                byte[] s9999 = new byte[16383];
                Arrays.fill(s8888, (byte) 0x38);
                Arrays.fill(s9999, (byte) 0x39);
                byte[] body8888 = (new String(lengthHeader) + new String(s8888))
                                .getBytes();
                byte[] body9999 = (new String(lengthHeader) + new String(s9999))
                                .getBytes();

                MockEndpoint mock = getMockEndpoint("mock:result");
                mock.expectedBodiesReceived(new String(s9999)+"9");

                Socket server1 = getSocket("localhost", 8888);
                Socket server2 = getSocket("localhost", 9999);

                try {
                        sendSopBuffer(body9999, server2);
                        sendSopBuffer(body8888, server1);
                        sendSopBuffer(new String("9").getBytes(), server2);
                } catch (Exception e) {
                        logger.error("", e);
                }

                server1.close();
                server2.close();

                mock.await(10, TimeUnit.SECONDS);
                mock.assertIsSatisfied();

        }

        protected RouteBuilder createRouteBuilder() throws Exception {
                return new RouteBuilder() {

                        public void configure() throws Exception {
                                from(
                                                "netty:tcp://localhost:8888?decoder=#length-decoder&sync=false")
                                                .process(processor);
                                from(
                                                "netty:tcp://localhost:9999?decoder=#length-decoder&sync=false")
                                                .process(processor).to("mock:result");
                        }
                };
        }

        private static Socket getSocket(String host, int port)
                        throws UnknownHostException, IOException {
                Socket s = new Socket(host, port);
                s.setSoTimeout(60000);
                return s;
        }

        public static void sendSopBuffer(byte[] buf, Socket server)
                        throws Exception {

                OutputStream netOut = server.getOutputStream();
                OutputStream dataOut = new BufferedOutputStream(netOut);

                try {
                        dataOut.write(buf, 0, buf.length);
                        dataOut.flush();
                } catch (Exception e) {
                        server.close();
                        throw (e);
                } finally {
                }
        }

        class P implements Processor {

                @Override
                public void process(Exchange exchange) throws Exception {
                        System.out.println(exchange.getIn().getHeaders());
                        System.out.println(new String(
                                        ((BigEndianHeapChannelBuffer) exchange.getIn().getBody())
                                                        .array()));
                        exchange.getOut().setBody(
                                        new String(((BigEndianHeapChannelBuffer) exchange.getIn()
                                                        .getBody()).array()));

                }
        }
}