5 Replies Latest reply on Apr 15, 2009 5:37 PM by arhan

    Serious flaw in camel-fix

    rodehav

      Hi!

       

      I've been experimenting a bit with Camel and QuickFix/J via the camel-fix component. I've written about this in other posts. I've found a couple of bugs and feature requests that I believe will be taken care of in a future version of the camel-fix component. Meanwhile I'm trying to get the existing camel-fix component working since I'm getting close to my production date. In this process I've done some reliability testing to make sure that no FIX messages are lost when throwing exceptions and when killing my process and so forth. Unfortunately I've discovered a serious flaw in the camel-fix component. Maybe this is already being addressed in the work for the future version of camel-fix but I need advice on how to make the existing version of camel-fix working until then.

       

      The way QuickFix/J works, it is imperative that all exceptions are propagated to the "fromApp()" method in the QuickFix/J application (the CamelApplication class in the camel-fix component). If the fromApp() method returns with no exceptions, then QuickFix/J assumes that your application has successfully dealt with the message. Thus, you must guarantee that the fromApp() method does not return until you're "done" in some sens. Whether "done" means processing the message all the way or just writing it to a persistent queue for further processing is of course up to the programmer. This has the following two consequences concerning the "onMessage()" method in the FixEndpoint class in the camel-fix component:

       

      1. All exceptions thrown must be propagated back to the fromApp() method. Currently, the onMessage() method cathches all exceptions and doesn't rethrow them. This will prevent QuickFix/J from functioning properly.

       

      2. The processing (initiated in onMessage() via "getLoadBalancer().process(exchange)) must be executed synchronously. This is largely a consequence of the first point.

       

      Examples of things that will fail with the current implementation of camel-fix:

       

      - If you throw a checked exception (FieldNotFound, IncorrectDataFormat, IncorrectTagValue or UnsupportedMessageType), the built in QuickFix/J handling won't be triggered if the fromApp() method doesn't receive this exception. Most of these exceptions should cause QuickFix/J to send a business message reject. Furthermore if UnsupportedMessageType is thrown, then QuickFix/J will remember that this message type is not supported and will reject it for the remainder of the session. The point is that QuickFix/J relies on receiving these exceptions in order to function properly.

       

      - If a runtime exception is thrown, then QuickFix/J assumes that something, of a temporary nature, went wrong and will ask the counter part to resend the message. This is done on the session level and thus automatically by QuickFix/J. If the fromApp() method doesn't recieve the runtime exception, then the corresponding message will never be resent and thus potentially lost.

       

      - If the process dies while processing a message (while in the fromApp() method), then QuickFix/J will remember that this message was not properly processed. On the next login, the QuickFix/J will therefore ask the counter part to resend all messages from that message. Thus, no messages will be lost. On the contrary, if I perform a System.exit() in the processing initiated by the onMessage() method (to simulate a crashed process), QuickFix/J will never request a resend because the onMessage() method has already returned (since it is not processed synchronously) and QuickFix/J believs that the message was processed successfully even though it wasn't.

       

      I would like two things:

       

      1. When you redesing the camel-fix component you have to make absolute certain that the processing initiatde by the camel-fix component is being done synchronously and that all exceptions are passed back to the fromApp() method.

       

      2. Since I need to go to production before you are done with the new camel-fix component, I have patched the camel-fix component to get it to work in my scenario. However, I'm not a Camel expert and I don't quite understnad how to modify the FixEndpoint message to make sure that the onMessage() method will initiate a synchronous request. Can you advice me on how to (temporarily) do this?

       

      Also, do you have andy indication on when I can put my teeth in the new camel-fix component?

       

      /Bengt

        • 1. Re: Serious flaw in camel-fix
          mielket

           

          I've found a couple of bugs and feature requests that I believe will be taken care of in a future version

           

          Can you please paste the JIRA numbers and perhaps links to the other discussions you had here?

           

          Also, do you have andy indication on when I can put my teeth in the new camel-fix component?

           

          Any bug fixes will be on the trunk version that you can check out yourself from https://projects.open.iona.com/projects/svn/iona/camel/

          • 2. Re: Serious flaw in camel-fix
            davsclaus

            Hi Bengt

             

            Thanks a lot for your detailed description of the flaws.

             

            I have hoped that Anton (the author of the google code camel-quickfix) project have gotten further than he has.

             

            I have scheduled a reworked camel-fix component for Camel 2.1 release.

             

            The ticket is here:

            https://issues.apache.org/activemq/browse/CAMEL-1350

             

            At that time we will rework it and make sure it complies with all your findings.

             

            As we love patches you are more than welcome to submit your patches to the existing component and attach it to the JIRA ticket.

             

            And please keep us posted with more findings.

            • 3. Re: Serious flaw in camel-fix
              rodehav

              I've come up with a workaround that seems to work for me. After having debugged the exception handling, it turned out that  when using a LoadBalancerConsumer (which camel-fix does), somewhere along the way, a copy strategy is being used. This means that when the Processor sets an exception on the Exchange it does so on a copy of the Exchange. When returning to the onMessage() method in FixEndpoint the Exception is no longer there since it was being set to a copy of the original Exchange. Thus I changed the FixEndpoint class so that a DefaultConsumer is being used instead. This seems to work. I changed the following methods in FixEndpoint:

              ...

                private Processor mProcessor;

              ...

                public Consumer createConsumer(Processor processor) throws Exception {

                  this.processor = processor;

                  return new DefaultConsumer(this, processor);

                  // return new LoadBalancerConsumer(this, processor, getLoadBalancer());

                }

              ...

                public void onMessage(Message message, SessionID sessionID) throws Throwable {

                  Exchange exchange = createExchange(message, sessionID);

                  // try {

                  this.processor.process(exchange);

                  // getLoadBalancer().process(exchange);

                  Throwable exception = exchange.getException();

                  if (exception != null) {

                    throw exception;

                  }

                  // } catch (Exception e) {

                  // LOG.error("Failed " + e + " when processing: " + message, e);

                  // }

                }

               

              I also changed the fromApp() method in CamelApplication as follows:

               

                public void fromApp(Message message, SessionID sessionID)

                    throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue,

                    UnsupportedMessageType {

                  if (LOG.isDebugEnabled()) {

                    LOG.debug("fromApp() session: " + sessionID + " " + message);

                  }

               

                  try {

                    endpoint.onMessage(message, sessionID);

                  }

                  catch (FieldNotFound theException) {

                    throw theException;

                  }

                  catch (IncorrectDataFormat theException) {

                    throw theException;

                  }

                  catch (IncorrectTagValue theException) {

                    throw theException;

                  }

                  catch (UnsupportedMessageType theException) {

                    throw theException;

                  }

                  catch (RuntimeException theException) {

                    LOG.error("Unexpected exception encountered in onMessage()", theException);

                    throw theException;

                  }

                  catch (Throwable theException) {

                    LOG.error("Unexpected exception encountered in onMessage()", theException);

                    throw new RuntimeException("Unexpected exception encountered in onMessage()", theException);

                  }

                }

               

              This seems to do the trick and QuickFix/J now works as it should regarding resends etc. Remember though that I'm not very fluent in programming for Camel and would very much like a Camel expert to review my changes.

               

              By the way, is there any way to format code segments properly? I can't seem to find any which makes the code hard to read.

               

              /Bengt

              • 5. Re: Serious flaw in camel-fix
                arhan

                Bengt,

                 

                I have added your findings to  camel-quickfix

                 

                Also, my guess is that instead of Camel load balancing you could use QuickFix/J's ThreadedSocketAcceptor to improve the performance...

                 

                 

                BR,

                Anton