0 Replies Latest reply on Nov 6, 2012 6:09 AM by Amol Walanj

    HornetQ with CORE API result in HTTP Thread Waiting

    Amol Walanj Newbie

      Title : HornetQ with CORE API result in HTTP Thread Waiting

       

      Product Details :

      Jboss 5.1 Native

      HornetQ 2.2.5 Final

       

      Issue :

      Jboss with HornetQ core api not able to scale and hence degrade the performance of the system as HTTP thread goes into waiting state

       

       

      Thread Dump :

      "http-0.0.0.0-8180-48" daemon prio=10 tid=0x00007fdd6800e000 nid=0x5ddf waiting on condition [0x00007fdd576f4000]

         java.lang.Thread.State: WAITING (parking)

      at sun.misc.Unsafe.park(Native Method)

      - parking to wait for  <0x00007fdfc10ab010> (a java.util.concurrent.Semaphore$NonfairSync)

      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)

      at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)

      at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)

      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)

      at java.util.concurrent.Semaphore.acquire(Semaphore.java:441)

      at org.hornetq.core.client.impl.ClientProducerCreditsImpl.acquireCredits(ClientProducerCreditsImpl.java:74)

      at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:305)

      at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:135)

      at com.demo.services.MyServiceImpl.sendMessage( MyServiceImpl.java:174)

      at com.demo.services.MyServiceImpl.placementRequest( MyServiceImpl.java:103)

      at sun.reflect.GeneratedMethodAccessor285.invoke(Unknown Source)

      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

      at java.lang.reflect.Method.invoke(Method.java:597)

      at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:180)

      at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96)

      at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:167)

      at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:94)

      at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:58)

      at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:94)

      at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:262)

      - locked <0x00007fe54ee74e80> (a org.apache.cxf.phase.PhaseInterceptorChain)

      at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:122)

      at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211)

      at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213)

      at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154)

      at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:129)

      at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:187)

      at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:110)

      at com.demo.services.MyCXFServlet.doPost( MyCXFServlet.java:33)

      at javax.servlet.http.HttpServlet.service(HttpServlet.java:637)

      at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:166)

      at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)

      at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)

      at org.jboss.web.tomcat.filters.ReplyHeaderFilter.doFilter(ReplyHeaderFilter.java:96)

      at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)

      at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)

      at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:235)

      at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)

      at org.jboss.web.tomcat.security.SecurityAssociationValve.invoke(SecurityAssociationValve.java:190)

      at org.jboss.web.tomcat.security.JaccContextValve.invoke(JaccContextValve.java:92)

      at org.jboss.web.tomcat.security.SecurityContextEstablishmentValve.process(SecurityContextEstablishmentValve.java:126)

      at org.jboss.web.tomcat.security.SecurityContextEstablishmentValve.invoke(SecurityContextEstablishmentValve.java:70)

      at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)

      at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)

      at org.jboss.web.tomcat.service.jca.CachedConnectionValve.invoke(CachedConnectionValve.java:158)

      at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)

      at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:330)

      at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:905)

      at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:592)

      at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:2036)

      at java.lang.Thread.run(Thread.java:662)

       

       

      Details :

       

      Requirement :

      A REST(XmlOverHttp) based web service which respond to request, After responding to request, notification will be generated using the sent response and the notification will be sent  to billing system.

      Delay of 10-15 min is ok for sending notifications but there has to be guarranty that every response sent should send notification. Because notifications are related to revenue / billing.

       

      Implementation and Observation :

       

      In order to send notification after responding to webservice request, we used JMS. To make notification sending asynchronouts, before returning the response our serviceImpl class is putting response on jms queue,

      later the consumer will prepare notification using the response received.

       

      To achieve this initially we started with jboss Messaging using JMS API with persistence

      Using this with the session caching we achieve TPS of around 1100

       

      Here we found JMS send is taking time in send so the performance of normal flow getting impacted.

       

      Then we moved to hornetQ , and using  JMS API in same JBOSS we almost find the same TPS.

      Latter we tried with standalone HorentQ. Here we reached TPS Up to 1300

       

      After that we came to know hornetQ core Api are faster than JMS API.

       

      So we moved our code to HornetQ core API.

      But we find that our TPS gets degraded to 200-250 using same.

       

      Code Snippet :

       

      Lifecycle Class which initialize the producer and consumer

       

       

          public static void init() {

              clearCache()

              final MyConfiguration MyCfg = MyCfgMgr.getAppCfg();

      mySession mySession = null;

       

              try {

      TransportConfiguration config = new TransportConfiguration(

      NettyConnectorFactory.class.getName());

      locator = HornetQClient.createServerLocatorWithoutHA(config);

      locator.setBlockOnAcknowledge(false);

      locator.setBlockOnDurableSend(false);

      myfactory = locator.createSessionFactory();

      }catch(HornetQException je){

      LOGGER.log("HornetQException ", je);

              }catch(Exception je){

      LOGGER.log("Exception  ", je);

              }

       

              for (int i = 0; i < myCfg.getMDBPoolSize(); i++) {

      try {

      ClientSession session = myfactory.createSession(true , true,1);

      session.start();

      ClientProducer clientProducer = session.createProducer("myAddress");

      mySession = new mySession(session, clientProducer ) ;

      }catch(HornetQException je){

      LOGGER.log("HornetQException ", je);

      }catch(Exception je){

      LOGGER.log("Exception ", je);

      }

      mySessionList.add(mySession);

              }

              initmyConsumer();

          }

         

          public static void initmyConsumer(){

              try{

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName());

      consumerLocator = HornetQClient.createServerLocatorWithoutHA(config);

      consumerLocator.setBlockOnAcknowledge(false);

      consumerLocator.setBlockOnDurableSend(false);

      hFactory = consumerLocator.createSessionFactory();

                  consumerSession = hFactory.createSession(true,true,1);

      consumerSession.start();

      for(int i = 1 ; i < myCfg.getMDBPoolSize() ; i++){

      ClientConsumer consumer = consumerSession.createConsumer("myQueue");

                      consumer.setMessageHandler(new MyMsgConsumer());

      consumerList.add(consumer);

      }

       

              }catch(HornetQException je){

      LOGGER.log("HornetQException  ", je);

              }catch(Exception je){

      LOGGER.log("Exception  ", je);

              }

          }

       

        public static void clearCache()

          {

      LOGGER.log("clearCache PdnSessionCache : "+pdnSessionList.size());

              for(PdnSession pdnSession : pdnSessionList){

      try {

      pdnSession.getClientProducer().close();

                      pdnSession.getSession().close();

      } catch (HornetQException e) {

      LOGGER.log("HornetQException while closing Sessions from cache ", e);

      }

              }

              if (locator != null) {

      locator.close();

              }

              if (pdnfactory != null) {

      pdnfactory.close();

              }

      pdnSessionList.clear();

             

              try {

      clearConsumer();

              } catch (HornetQException e) {

      LOGGER.log("HornetQException while closing ClientSession ", e);

              }

             

          }

         

          public static void clearConsumer() throws HornetQException

          {

      for(ClientConsumer c : consumerList){

      c.close();

              }

      consumerList.clear();

              if (consumerLocator != null) {

      consumerLocator.close();

              }

              if (consumer != null) {

      consumer.close();

              }

              if (consumerSession != null) {

      consumerSession.close();

              }

       

              if (hFactory != null) {

      hFactory.close();

              }

          }

       

       

       

      MyMsgConsumer class is message consumer and is responsible for handling messages and sending notifications.

       

      Message Creation and sending :

       

      Class Name : MyServiceImpl.java

       

      private void sendMessage(final String res){

              try{

               // MySession is wrapper class holding ClientSession and ClientProducer as its members

      MySession mySession = MyGlobals.getmyJmsSession();

                  ClientMessage msg = mySession.getSession().createMessage(org.hornetq.api.core.Message.TEXT_TYPE, true);           

                  msg.getBodyBuffer().writeString(res);

      mySession.getClientProducer().send(msg);

      MyGlobals.returnmyJmsSession(mySession);

      }catch(HornetQException je){

      LOGGER.log("HornetQException ", je);

              }catch(Exception je){

      LOGGER.log("Exception  ", je);

              }

       

          }