1 2 Previous Next 18 Replies Latest reply on Feb 28, 2010 6:17 AM by Tim Fox

    StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...

    Michael Justin Novice

      Hello,

       

      the current trunk version shows this error message for every message when the server receives a messages and tries to send it to the consumer.

      I will check if this is also caused by messages coming in too fast from the producer. I will come back later with results.

       

      My other unit tests on the Delphi / Free Pascal side for text messages passed, including Unicode (UTF-8).

       

      Regards,

      Michael

       

       

      [New I/O server worker #2-2] 15:49:58,138 SEVERE [org.hornetq.core.protocol.stom
      p.StompProtocolManager]  Unable to send frame StompFrame[command=MESSAGE, header
      s={timestamp=1266936598135, redelivered=false, expires=0, subscription={72B75D72
      -36FC-4830-8E7F-BEB9404A2C09}, priority=0, content-length=255, message-id=236223
      20154, destination=jms.queue.ExampleQueue}, content-length=255]
      java.lang.IllegalStateException: await*() in I/O thread causes a dead lock or su
      dden performance drop. Use addListener() instead or call await*() from a differe
      nt thread.
              at org.jboss.netty.channel.DefaultChannelFuture.checkDeadLock(DefaultCha
      nnelFuture.java:283)
              at org.jboss.netty.channel.DefaultChannelFuture.await0(DefaultChannelFut
      ure.java:247)
              at org.jboss.netty.channel.DefaultChannelFuture.await(DefaultChannelFutu
      re.java:188)
              at org.hornetq.integration.transports.netty.NettyConnection.write(NettyC
      onnection.java:127)
              at org.hornetq.core.protocol.stomp.StompProtocolManager.doSend(StompProt
      ocolManager.java:615)
              at org.hornetq.core.protocol.stomp.StompProtocolManager.access$200(Stomp
      ProtocolManager.java:49)
              at org.hornetq.core.protocol.stomp.StompProtocolManager$1.done(StompProt
      ocolManager.java:545)
              at org.hornetq.core.persistence.impl.journal.OperationContextImpl.execut
      eOnCompletion(OperationContextImpl.java:158)
              at org.hornetq.core.persistence.impl.journal.JournalStorageManager.after
      CompleteOperations(JournalStorageManager.java:389)
              at org.hornetq.core.protocol.stomp.StompProtocolManager.send(StompProtoc
      olManager.java:533)
              at org.hornetq.core.protocol.stomp.StompSession.sendMessage(StompSession
      .java:103)
              at org.hornetq.core.server.impl.ServerConsumerImpl.deliverStandardMessag
      e(ServerConsumerImpl.java:630)
              at org.hornetq.core.server.impl.ServerConsumerImpl.handle(ServerConsumer
      Impl.java:257)
              at org.hornetq.core.server.impl.QueueImpl.handle(QueueImpl.java:1347)
              at org.hornetq.core.server.impl.QueueImpl.directDeliver(QueueImpl.java:1
      250)
              at org.hornetq.core.server.impl.QueueImpl.add(QueueImpl.java:1307)
              at org.hornetq.core.server.impl.QueueImpl.addLast(QueueImpl.java:231)
              at org.hornetq.core.postoffice.impl.PostOfficeImpl.addReferences(PostOff
      iceImpl.java:967)
              at org.hornetq.core.postoffice.impl.PostOfficeImpl.access$200(PostOffice
      Impl.java:78)
              at org.hornetq.core.postoffice.impl.PostOfficeImpl$1.done(PostOfficeImpl
      .java:954)
              at org.hornetq.core.persistence.impl.journal.OperationContextImpl.execut
      eOnCompletion(OperationContextImpl.java:158)
              at org.hornetq.core.persistence.impl.journal.JournalStorageManager.after
      CompleteOperations(JournalStorageManager.java:389)
              at org.hornetq.core.postoffice.impl.PostOfficeImpl.processRoute(PostOffi
      ceImpl.java:943)
              at org.hornetq.core.postoffice.impl.PostOfficeImpl.route(PostOfficeImpl.
      java:665)
              at org.hornetq.core.server.impl.ServerSessionImpl.doSend(ServerSessionIm
      pl.java:1252)
              at org.hornetq.core.server.impl.ServerSessionImpl.send(ServerSessionImpl
      .java:941)
              at org.hornetq.core.protocol.stomp.StompProtocolManager.onSend(StompProt
      ocolManager.java:503)
              at org.hornetq.core.protocol.stomp.StompProtocolManager.doHandleBuffer(S

        • 1. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
          Jeff Mesnil Master

          No need to check: the stomp protocol manager should send stomp messages in a separate thread to avoid

          deadlock on netty's serverworker.

          I accidentally removed this when I refactored the code but I've just added it back in the trunk.

          • 2. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
            Tim Fox Master

            Why do you need to send using a different thread? There should be no need for this. (It's not required for core, for example)

             

            Can you explain the deadlock in more detail?

            • 3. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
              Michael Justin Novice

              Ok, this looks good now. Performance now has changed a lot, with the first version I tested last week there was a blazing speed of 2,400 msgs/s (one million send and receive frames in seven minutes). With the current version, speed has decreased to around 100 msgs/s.

               

              I have attached two command line tools to produce and consume messages over the Stomp acceptor.

               

              usage:

               

              ProducerTool --messagecount=10000

               

              (default messagecount is 10)

               

              and

               

              ConsumerTool --MaximumMessages=10000

               

              (more parameters see readme.txt).

               

              For high performance tests, output can be disabled using --verbose=false

               

              If you prefer a version for Linux, please let me know.

               

              Regards,

              Michael

              • 4. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                Jeff Mesnil Master

                mjustin wrote:

                 

                Ok, this looks good now. Performance now has changed a lot, with the first version I tested last week there was a blazing speed of 2,400 msgs/s (one million send and receive frames in seven minutes). With the current version, speed has decreased to around 100 msgs/s.

                This is not normal. I was expecting the new decoder to be a tad slower: the previous decoder was just reading the bytes until it finds the first NULL byte to delimit the Stomp frame.

                The new decoder now reads the bytes, creates Strings, interprets them to know when the frame really ends.

                However, it should not be slower by such an order of magnitude.

                 

                I'll profile the code to see where the bottleneck is.

                • 5. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                  Tim Fox Master

                  If you're sending messages non blocking, you should be able to get upto around 100000 np messages per sec, on reasonable hardware - this is what we get for the core protocol, STOMP will be a bit slower but shouldn't be a whole lot of difference.

                   

                  If you're sending messages blocking, or persistent, then you're up against hardware limits such as disk sync rate and network latency.

                  • 6. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                    Michael Justin Novice

                    Hi,

                     

                    thank you for the feedback. I have also seen many "java.lang.IllegalStateException:  await*() in I/O thread causes a dead lock or sudden performance  drop." message appearing in the ConcurrentStompTest when I run it in the IDE.

                     

                    Michael

                    • 7. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                      Tim Fox Master

                      mjustin wrote:

                       

                      Hi,

                       

                      thank you for the feedback. I have also seen many "java.lang.IllegalStateException:  await*() in I/O thread causes a dead lock or sudden performance  drop." message appearing in the ConcurrentStompTest when I run it in the IDE.

                       

                      Michael

                      This means the stomp protocol implementation is blocking somewhere after fielding a request. This would be a bug in the protocol code and probably explain the slow-down.

                       

                      You can't block on remoting threads.

                      • 8. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                        Tim Fox Master

                        It's important in HornetQ that all server code is non blocking end-to-end.

                        • 9. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                          Michael Justin Novice

                          I am doing tests with the ExampleQueue which I have configured

                           

                              <queue name="ExampleQueue">
                                  <entry name="/queue/ExampleQueue"/>
                                  <durable>false</durable>
                              </queue>

                           

                          However my main concern is that performance dropped from 2,400 to less than 100 msgs/second with the last changes. But if time permits I will do some tests in Eclipse this weekend.

                          • 10. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                            Tim Fox Master

                            Also.. looking at the current code, the session context stuff isn't implemented correctly.

                             

                            This needs to be done like it's done in the core protocol. otherwise things won't work

                            • 11. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                              Michael Justin Novice
                              ConcurrentStompTest is part of the HornetQ test sources, it is not in my client test code. So if I understand you correctly, there is something which needs to be checked in HornetQ's protocol implementation?
                              • 12. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                                Tim Fox Master

                                Maybe this is the problem, in StompProtocolManager.doSend:

                                 

                                connection.getTransportConnection().write(buffer, true);

                                 

                                Flush should be false, not true, otherwise the entire netty write queue will get flushed on each response written!

                                 

                                This will be very slow, and also block, which also probably explains the warnings.

                                • 13. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                                  Jeff Mesnil Master

                                  timfox wrote:

                                   

                                  Maybe this is the problem, in StompProtocolManager.doSend:

                                   

                                  connection.getTransportConnection().write(buffer, true);

                                   

                                  Flush should be false, not true, otherwise the entire netty write queue will get flushed on each response written!

                                   

                                  This will be very slow, and also block, which also probably explains the warnings.

                                  Who's the dumb guy who wrote that code? Err.... never mind...

                                  I've fixed it in the trunk and removed the unneeded executor when handling a buffer.

                                  • 14. Re: StompProtocolManager: Unable to send frame StompFrame[command=MESSAGE ...
                                    Michael Justin Novice

                                    Hello,

                                     

                                    thanks this looks very good now, performance is back to around 2000 and more messages per second. Some of my tests (ConsumerTool and ProducerTool) however still have performance problems and they also bring the broker in a state where all following tests also show low performance.

                                     

                                    The attached perfomance test GUI application runs very well performance wise. But when more than one thread is sending / receiving many messages are lost. The test app retries two times to catch messages which have been to slow and then gives up.

                                     

                                     

                                    Every send and receive thread has its own socket connection. I will test a little more over the weekend. Here is the code for the threads:

                                     

                                    procedure THabariSendThread.Execute;
                                    var
                                      Connection: IConnection;
                                      Session: ISession;
                                      Destination: IDestination;
                                      Producer: IMessageProducer;
                                      I: Integer;
                                    begin
                                      inherited;


                                      Log(Format('Send %d messages to queue %s', [NumMsgs, QueueName]));


                                      IncCount;
                                      Content := DupeString('A', MsgSize);


                                      Connection := TBTJMSConnection.MakeConnection;
                                      Connection.Start;
                                      try
                                        Session := Connection.CreateSession(False, amAutoAcknowledge);
                                        Destination := Session.CreateQueue(QueueName);
                                        Producer := Session.CreateProducer(Destination);


                                        for I := 0 to NumMsgs - 1 do
                                        begin
                                          Producer.Send(Session.CreateTextMessage(Content));
                                        end;
                                        Log(Format('%d messages sent to queue %s', [NumMsgs, QueueName]));
                                      finally
                                        Connection.Close;
                                      end;
                                    end;


                                    { THabariReceiveThread }


                                    procedure THabariReceiveThread.Execute;
                                    var
                                      Connection: IConnection;
                                      Session: ISession;
                                      Destination: IDestination;
                                      Consumer: IMessageConsumer;
                                      I: Integer;
                                      RemainingCount: Integer;
                                      Msg: IMessage;
                                    begin
                                      inherited;


                                      Connection := TBTJMSConnection.MakeConnection;
                                      try
                                        try
                                          Connection.Start;


                                          Log(Format('Receive %d messages from queue %s', [NumMsgs, QueueName]));


                                          Session := Connection.CreateSession(False, amAutoAcknowledge);
                                          Destination := Session.CreateQueue(QueueName);
                                          Consumer := Session.CreateConsumer(Destination);


                                          Msg := Consumer.Receive(3000);
                                          if not Assigned(Msg) then
                                          begin
                                            Log(Format('ERROR: no message received from queue %s', [QueueName]));
                                          end
                                          else
                                          begin
                                            Started := MilliSecondOfTheYear(Now);


                                            RemainingCount := NumMsgs - 1; // one already received


                                            for I := 0 to NumMsgs - 2 do
                                            begin
                                              Msg := Consumer.Receive(MILLISEC_PER_MESSAGE);
                                              if Assigned(Msg) then
                                              begin
                                                Assert(Length((Msg as ITextMessage).Text) = MsgSize);
                                                Dec(RemainingCount);
                                              end;
                                            end;


                                            // give a second chance
                                            if 0 <> RemainingCount then
                                            begin
                                              Log(Format('WARN: %d messages missing in queue %s, retrying...',
                                                [RemainingCount, QueueName]));
                                              for I := 1 to RemainingCount do
                                              begin
                                                Msg := Consumer.Receive(2 * MILLISEC_PER_MESSAGE);
                                                if Assigned(Msg) then
                                                begin
                                                  Assert(Length((Msg as ITextMessage).Text) = MsgSize);
                                                  Dec(RemainingCount);
                                                end;
                                              end;
                                            end;


                                            // give a third chance
                                            if 0 <> RemainingCount then
                                            begin
                                              Log(Format('WARN: %d messages missing in queue %s, retrying (again)...',
                                                [RemainingCount, QueueName]));
                                              for I := 1 to RemainingCount do
                                              begin
                                                Msg := Consumer.Receive(4 * MILLISEC_PER_MESSAGE);
                                                if Assigned(Msg) then
                                                begin
                                                  Assert(Length((Msg as ITextMessage).Text) = MsgSize);
                                                  Dec(RemainingCount);
                                                end;
                                              end;
                                            end;


                                            if RemainingCount = 0 then
                                            begin
                                              Log(Format('%d messages received from queue %s in %d msec', [NumMsgs, QueueName, Elapsed]));
                                            end
                                            else
                                            begin
                                              Log(Format('ERROR: %d messages missing in queue %s', [RemainingCount, QueueName]));
                                            end;
                                          end;
                                        except
                                          raise;
                                        end;


                                      finally
                                        Connection.Close;
                                      end;


                                      DecCount;
                                    end;

                                    1 2 Previous Next