[1/3] camel git commit: make checkstyle happy

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[1/3] camel git commit: make checkstyle happy

davsclaus-2
Repository: camel
Updated Branches:
  refs/heads/master 901b44eab -> d88768738


make checkstyle happy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d8876873
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d8876873
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d8876873

Branch: refs/heads/master
Commit: d8876873806ec972e53c95fdc292d44c81fd071f
Parents: 8cfe898
Author: christian ohr <[hidden email]>
Authored: Thu Mar 16 21:24:40 2017 +0100
Committer: Claus Ibsen <[hidden email]>
Committed: Mon Mar 20 17:11:46 2017 +0100

----------------------------------------------------------------------
 .../apache/camel/component/hl7/HL7MLLPConfig.java | 14 +++++++-------
 .../hl7/HL7MLLPCodecMessageFloodingTest.java      | 18 +++++++++---------
 2 files changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d8876873/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
index 5f580a7..88d58f0 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
@@ -42,10 +42,10 @@ public class HL7MLLPConfig {
 
     private boolean produceString = true;
 
- private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT;
-
+    private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT;
+
     private CodingErrorAction unmappableCharacterErrorAction = CodingErrorAction.REPORT;
-
+
     public Charset getCharset() {
         return charset;
     }
@@ -118,20 +118,20 @@ public class HL7MLLPConfig {
     public void setProduceString(boolean produceString) {
         this.produceString = produceString;
     }
-
+
     public CodingErrorAction getMalformedInputErrorAction() {
         return malformedInputErrorAction;
     }
 
- public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) {
+    public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) {
         this.malformedInputErrorAction = malformedInputErrorAction;
     }
-
+
     public CodingErrorAction getUnmappableCharacterErrorAction() {
         return unmappableCharacterErrorAction;
     }
 
     public void setUnmappableCharacterErrorAction(CodingErrorAction unmappableCharacterErrorAction) {
         this.unmappableCharacterErrorAction = unmappableCharacterErrorAction;
-    }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d8876873/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
index f1e5ba3..5b08584 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
@@ -16,13 +16,6 @@
  */
 package org.apache.camel.component.hl7;
 
-import ca.uhn.hl7v2.model.Message;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.impl.JndiRegistry;
-import org.junit.Test;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -32,6 +25,13 @@ import java.net.Socket;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import ca.uhn.hl7v2.model.Message;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
 /**
  * Unit test for the HL7MLLP Codec.
  */
@@ -103,8 +103,8 @@ public class HL7MLLPCodecMessageFloodingTest extends HL7TestSupport {
         });
         t.start();
 
-        String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r" +
-                "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||";
+        String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r"
+                + "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||";
         for (int i = 0; i < messageCount; i++) {
             String msg = String.format(in, i);
             outputStream.write(11);

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[2/3] camel git commit: remove unused imports

davsclaus-2
remove unused imports


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8cfe8986
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8cfe8986
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8cfe8986

Branch: refs/heads/master
Commit: 8cfe89860d939236ae3e7271cd5fe145e76894b3
Parents: 8fa474f
Author: christian ohr <[hidden email]>
Authored: Thu Mar 16 14:44:10 2017 +0100
Committer: Claus Ibsen <[hidden email]>
Committed: Mon Mar 20 17:11:46 2017 +0100

----------------------------------------------------------------------
 .../camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java     | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8cfe8986/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
index 17133cc..f1e5ba3 100644
--- a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
@@ -17,10 +17,6 @@
 package org.apache.camel.component.hl7;
 
 import ca.uhn.hl7v2.model.Message;
-import ca.uhn.hl7v2.model.v24.message.ADR_A19;
-import ca.uhn.hl7v2.model.v24.segment.MSA;
-import ca.uhn.hl7v2.model.v24.segment.MSH;
-import ca.uhn.hl7v2.model.v24.segment.QRD;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[3/3] camel git commit: Make MLLP Decoder more robust when requests are flooding and more flexible on charset errors

davsclaus-2
In reply to this post by davsclaus-2
Make MLLP Decoder more robust when requests are flooding and more flexible on charset errors


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8fa474fc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8fa474fc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8fa474fc

Branch: refs/heads/master
Commit: 8fa474fc952ae3b5c469c51c74ad163661f83bce
Parents: 901b44e
Author: christian ohr <[hidden email]>
Authored: Thu Mar 16 14:41:10 2017 +0100
Committer: Claus Ibsen <[hidden email]>
Committed: Mon Mar 20 17:11:46 2017 +0100

----------------------------------------------------------------------
 .../camel/component/hl7/HL7MLLPConfig.java      |  21 +++
 .../camel/component/hl7/HL7MLLPDecoder.java     | 135 ++++++++++--------
 .../hl7/HL7MLLPCodecMessageFloodingTest.java    | 138 +++++++++++++++++++
 3 files changed, 236 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8fa474fc/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
index 2d5abb2..5f580a7 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPConfig.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.hl7;
 
 import java.nio.charset.Charset;
+import java.nio.charset.CodingErrorAction;
 
 import ca.uhn.hl7v2.DefaultHapiContext;
 import ca.uhn.hl7v2.HapiContext;
@@ -41,6 +42,10 @@ public class HL7MLLPConfig {
 
     private boolean produceString = true;
 
+ private CodingErrorAction malformedInputErrorAction = CodingErrorAction.REPORT;
+
+    private CodingErrorAction unmappableCharacterErrorAction = CodingErrorAction.REPORT;
+
     public Charset getCharset() {
         return charset;
     }
@@ -113,4 +118,20 @@ public class HL7MLLPConfig {
     public void setProduceString(boolean produceString) {
         this.produceString = produceString;
     }
+
+    public CodingErrorAction getMalformedInputErrorAction() {
+        return malformedInputErrorAction;
+    }
+
+ public void setMalformedInputErrorAction(CodingErrorAction malformedInputErrorAction) {
+        this.malformedInputErrorAction = malformedInputErrorAction;
+    }
+
+    public CodingErrorAction getUnmappableCharacterErrorAction() {
+        return unmappableCharacterErrorAction;
+    }
+
+    public void setUnmappableCharacterErrorAction(CodingErrorAction unmappableCharacterErrorAction) {
+        this.unmappableCharacterErrorAction = unmappableCharacterErrorAction;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fa474fc/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
index 4a54d47..2866aca 100644
--- a/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
+++ b/components/camel-hl7/src/main/java/org/apache/camel/component/hl7/HL7MLLPDecoder.java
@@ -42,76 +42,89 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         this.config = config;
     }
 
-
     @Override
-    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) {
+    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
 
         // Get the state of the current message and
-        // Skip what we have already scanned
+        // Skip what we have already scanned before
         DecoderState state = decoderState(session);
         in.position(state.current());
 
+        LOG.debug("Received data, checking from position {} to {}", in.position(), in.limit());
+        boolean messageDecoded = false;
+
         while (in.hasRemaining()) {
+
+            int previousPosition = in.position();
             byte current = in.get();
 
-            // If it is the start byte and mark the position
-            if (current == config.getStartByte()) {
-                state.markStart(in.position() - 1);
-            }
-            // If it is the end bytes, extract the payload and return
-            if (state.previous() == config.getEndByte1() && current == config.getEndByte2()) {
-
-                // Remember the current position and limit.
-                int position = in.position();
-                int limit = in.limit();
-                LOG.debug("Message ends at position {} with length {}",
-                        position, position - state.start());
-                try {
+            // Check if we are at the end of an HL7 message
+            if (current == config.getEndByte2() && state.previous() == config.getEndByte1()) {
+                if (state.isStarted()) {
+                    // Save the current buffer pointers and reset them to surround the identifier message
+                    int currentPosition = in.position();
+                    int currentLimit = in.limit();
+                    LOG.debug("Message ends at position {} with length {}", previousPosition, previousPosition - state.start() + 1);
                     in.position(state.start());
-                    in.limit(position);
-                    // The bytes between in.position() and in.limit()
-                    // now contain a full MLLP message including the
-                    // start and end bytes.
-                    out.write(config.isProduceString()
-                            ? parseMessageToString(in.slice(), charsetDecoder(session))
-                            : parseMessageToByteArray(in.slice()));
-                } catch (CharacterCodingException cce) {
-                    throw new IllegalArgumentException("Exception while finalizing the message", cce);
-                } finally {
-                    // Reset position, limit, and state
-                    in.limit(limit);
-                    in.position(position);
-                    state.reset();
+                    in.limit(currentPosition);
+                    LOG.debug("Set start to position {} and limit to {}", in.position(), in.limit());
+
+                    // Now create string or byte[] from this part of the buffer and restore the buffer pointers
+                    try {
+                        out.write(config.isProduceString()
+                                ? parseMessageToString(in.slice(), charsetDecoder(session))
+                                : parseMessageToByteArray(in.slice()));
+                        messageDecoded = true;
+                    } finally {
+                        LOG.debug("Resetting to position {} and limit to {}", currentPosition, currentLimit);
+                        in.position(currentPosition);
+                        in.limit(currentLimit);
+                        state.reset();
+                    }
+                } else {
+                    LOG.warn("Ignoring message end at position {} until start byte has been seen.", previousPosition);
+                }
+            } else {
+                // Check if we are at the start of an HL7 message
+                if (current == config.getStartByte()) {
+                    state.markStart(previousPosition);
+                } else {
+                    // Remember previous byte in state object because the buffer could
+                    // be theoretically exhausted right between the two end bytes
+                    state.markPrevious(current);
                 }
-                return true;
+                messageDecoded = false;
             }
-            // Remember previous byte in state object because the buffer could
-            // be theoretically exhausted right between the two end bytes
-            state.markPrevious(current);
         }
 
-        // Could not find a complete message in the buffer.
-        // Reset to the initial position and return false so that this method
-        // is called again with more data.
-        LOG.debug("No complete message yet at position {} ", in.position());
-        state.markCurrent(in.position());
-        in.position(0);
-        return false;
+        if (!messageDecoded) {
+            // Could not find a complete message in the buffer.
+            // Reset to the initial position (just as nothing had been read yet)
+            // and return false so that this method is called again with more data.
+            LOG.debug("No complete message yet at position {} ", in.position());
+            state.markCurrent(in.position());
+            in.position(0);
+        }
+        return messageDecoded;
     }
 
     // Make a defensive byte copy (the buffer will be reused)
     // and omit the start and the two end bytes of the MLLP message
     // returning a byte array
-    private Object parseMessageToByteArray(IoBuffer slice) throws CharacterCodingException {
-        byte[] dst = new byte[slice.limit() - 3];
-        slice.skip(1); // skip start byte
-        slice.get(dst, 0, dst.length);
+    private Object parseMessageToByteArray(IoBuffer buf) throws CharacterCodingException {
+        int len = buf.limit() - 3;
+        LOG.debug("Making byte array of length {}", len);
+        byte[] dst = new byte[len];
+        buf.skip(1); // skip start byte
+        buf.get(dst, 0, len);
+        buf.skip(2); // skip end bytes
 
         // Only do this if conversion is enabled
         if (config.isConvertLFtoCR()) {
+            LOG.debug("Replacing LF by CR");
             for (int i = 0; i < dst.length; i++) {
-                if (dst[i] == (byte)'\n') {
-                    dst[i] = (byte)'\r';
+                if (dst[i] == (byte) '\n') {
+                    dst[i] = (byte) '\r';
                 }
             }
         }
@@ -121,12 +134,16 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
     // Make a defensive byte copy (the buffer will be reused)
     // and omit the start and the two end bytes of the MLLP message
     // returning a String
-    private Object parseMessageToString(IoBuffer slice, CharsetDecoder decoder) throws CharacterCodingException {
-        slice.skip(1); // skip start byte
-        String message = slice.getString(slice.limit() - 3, decoder);
+    private Object parseMessageToString(IoBuffer buf, CharsetDecoder decoder) throws CharacterCodingException {
+        int len = buf.limit() - 3;
+        LOG.debug("Making string of length {} using charset {}", len, decoder.charset());
+        buf.skip(1); // skip start byte
+        String message = buf.getString(len, decoder);
+        buf.skip(2); // skip end bytes
 
         // Only do this if conversion is enabled
         if (config.isConvertLFtoCR()) {
+            LOG.debug("Replacing LF by CR");
             message = message.replace('\n', '\r');
         }
         return message;
@@ -142,7 +159,9 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         synchronized (session) {
             CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER);
             if (decoder == null) {
-                decoder = config.getCharset().newDecoder();
+                decoder = config.getCharset().newDecoder()
+                    .onMalformedInput(config.getMalformedInputErrorAction())
+                    .onUnmappableCharacter(config.getUnmappableCharacterErrorAction());
                 session.setAttribute(CHARSET_DECODER, decoder);
             }
             return decoder;
@@ -164,25 +183,22 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
      * Holds the state of the decoding process
      */
     private static class DecoderState {
-        private int startPos;
+        private int startPos = -1;
         private int currentPos;
         private byte previousByte;
-        private boolean started;
 
         void reset() {
-            startPos = 0;
+            startPos = -1;
             currentPos = 0;
-            started = false;
             previousByte = 0;
         }
 
         void markStart(int position) {
-            if (started) {
+            if (isStarted()) {
                 LOG.warn("Ignoring message start at position {} before previous message has ended.", position);
             } else {
                 startPos = position;
                 LOG.debug("Message starts at position {}", startPos);
-                started = true;
             }
         }
 
@@ -205,6 +221,9 @@ class HL7MLLPDecoder extends CumulativeProtocolDecoder {
         public byte previous() {
             return previousByte;
         }
-    }
 
+        public boolean isStarted() {
+            return startPos >= 0;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8fa474fc/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
----------------------------------------------------------------------
diff --git a/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
new file mode 100644
index 0000000..17133cc
--- /dev/null
+++ b/components/camel-hl7/src/test/java/org/apache/camel/component/hl7/HL7MLLPCodecMessageFloodingTest.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hl7;
+
+import ca.uhn.hl7v2.model.Message;
+import ca.uhn.hl7v2.model.v24.message.ADR_A19;
+import ca.uhn.hl7v2.model.v24.segment.MSA;
+import ca.uhn.hl7v2.model.v24.segment.MSH;
+import ca.uhn.hl7v2.model.v24.segment.QRD;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Unit test for the HL7MLLP Codec.
+ */
+public class HL7MLLPCodecMessageFloodingTest extends HL7TestSupport {
+    
+
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        HL7MLLPCodec codec = new HL7MLLPCodec();
+        codec.setCharset("ISO-8859-1");
+        codec.setConvertLFtoCR(false);
+        jndi.bind("hl7codec", codec);
+        return jndi;
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("mina2:tcp://127.0.0.1:" + getPort() + "?sync=true&codec=#hl7codec")
+                    .unmarshal().hl7()
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            Message input = exchange.getIn().getBody(Message.class);
+                            Message response = input.generateACK();
+                            exchange.getOut().setBody(response);
+                            Thread.sleep(50); // simulate some processing time
+                        }
+                    })
+                    .to("mock:result");
+            }
+        };
+    }
+
+    @Test
+    public void testHL7MessageFlood() throws Exception {
+
+        // Write and receive using plain sockets and in different threads
+        Socket socket = new Socket("localhost", getPort());
+        BufferedOutputStream outputStream = new BufferedOutputStream(new DataOutputStream(socket.getOutputStream()));
+        final BufferedInputStream inputStream = new BufferedInputStream(new DataInputStream(socket.getInputStream()));
+
+        int messageCount = 100;
+        CountDownLatch latch = new CountDownLatch(messageCount);
+
+        Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                int response;
+                StringBuilder s = new StringBuilder();
+                try {
+                    int i = 0;
+                    boolean cont = true;
+                    while (cont && (response = inputStream.read()) >= 0) {
+                        if (response == 28) {
+                            response = inputStream.read(); // read second end byte
+                            if (response == 13) {
+                                // Responses must arrive in same order
+                                cont = s.toString().contains(String.format("X%dX", i++));
+                                s.setLength(0);
+                                latch.countDown();
+                            }
+                        } else {
+                            s.append((char) response);
+                        }
+                    }
+                } catch (IOException ignored) {
+                }
+            }
+        });
+        t.start();
+
+        String in = "MSH|^~\\&|MYSENDER|MYRECEIVER|MYAPPLICATION||200612211200||QRY^A19|X%dX|P|2.4\r" +
+                "QRD|200612211200|R|I|GetPatient|||1^RD|0101701234|DEM||";
+        for (int i = 0; i < messageCount; i++) {
+            String msg = String.format(in, i);
+            outputStream.write(11);
+            outputStream.flush();
+            // Some systems send end bytes in a separate frame
+            // Thread.sleep(10);
+            outputStream.write(msg.getBytes());
+            outputStream.flush();
+            // Some systems send end bytes in a separate frame
+            // Thread.sleep(10);
+            outputStream.write(28);
+            outputStream.write(13);
+            outputStream.flush();
+            // Potentially wait after message
+            // Thread.sleep(10);
+        }
+
+        boolean success = latch.await(20, TimeUnit.SECONDS);
+
+        outputStream.close();
+        inputStream.close();
+        socket.close();
+
+        assertTrue(success);
+    }
+
+}

Loading...