8 Replies Latest reply on Dec 7, 2007 10:32 AM by timfox

    Failover failed

    chrismeadows

      JBoss 4.2.2 GA with JBM 1.4.0.SP1 configured as per documentation, deployed identically over a two node cluster (ServerPeers 0 and 1 respectively).

      I send 1000 messages to a queue. An MDB is listening to the queue, and it redispatches each message to one of 50 topics (randomly).

      Each topic has a durable subscription client that is already listening before the message sending begins.

      At some point after messages have started being sent, redispatched and received, I kill the receiving node in the cluster with a kill -9. Sometimes failover works, but in my experience this is not the norm. More often I get a 'Failover failed'. See stacktrace below. It's complaining about duplicate clientIDs, but it isn't my code that is setting them, so I don't see what I can do. Any ideas?

      638453 [Thread-259] ERROR org.jboss.jms.client.FailoverCommandCenter - Failover failed
      javax.jms.IllegalStateException: Cannot set clientID, already set as DurableSubscriberExample31
       at org.jboss.jms.server.endpoint.ServerConnectionEndpoint.setClientID(ServerConnectionEndpoint.java:309)
       at org.jboss.jms.server.endpoint.advised.ConnectionAdvised.org$jboss$jms$server$endpoint$advised$ConnectionAdvised$setClientID$aop(ConnectionAdvised.java:85)
       at org.jboss.jms.server.endpoint.advised.ConnectionAdvised$setClientID_N1479100880614063379.invokeNext(ConnectionAdvised$setClientID_N1479100880614063379.java)
       at org.jboss.jms.server.container.ServerLogInterceptor.invoke(ServerLogInterceptor.java:105)
       at org.jboss.jms.server.endpoint.advised.ConnectionAdvised$setClientID_N1479100880614063379.invokeNext(ConnectionAdvised$setClientID_N1479100880614063379.java)
       at org.jboss.jms.server.endpoint.advised.ConnectionAdvised.setClientID(ConnectionAdvised.java)
       at org.jboss.jms.wireformat.ConnectionSetClientIDRequest.serverInvoke(ConnectionSetClientIDRequest.java:73)
       at org.jboss.jms.server.remoting.JMSServerInvocationHandler.invoke(JMSServerInvocationHandler.java:143)
       at org.jboss.remoting.ServerInvoker.invoke(ServerInvoker.java:769)
       at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:573)
       at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:387)
       at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:166)
       at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:163)
       at org.jboss.remoting.Client.invoke(Client.java:1634)
       at org.jboss.remoting.Client.invoke(Client.java:548)
       at org.jboss.remoting.Client.invoke(Client.java:536)
       at org.jboss.jms.client.delegate.DelegateSupport.doInvoke(DelegateSupport.java:187)
       at org.jboss.jms.client.delegate.DelegateSupport.doInvoke(DelegateSupport.java:158)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate.org$jboss$jms$client$delegate$ClientConnectionDelegate$setClientID$aop(ClientConnectionDelegate.java:228)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate$setClientID_N1479100880614063379.invokeNext(ClientConnectionDelegate$setClientID_N1479100880614063379.java)
       at org.jboss.jms.client.container.ConnectionAspect.handleSetClientID(ConnectionAspect.java:105)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
       at java.lang.reflect.Method.invoke(Method.java:585)
       at org.jboss.aop.advice.PerInstanceAdvice.invoke(PerInstanceAdvice.java:121)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate$setClientID_N1479100880614063379.invokeNext(ClientConnectionDelegate$setClientID_N1479100880614063379.java)
       at org.jboss.jms.client.container.FailoverValveInterceptor.invoke(FailoverValveInterceptor.java:114)
       at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate$setClientID_N1479100880614063379.invokeNext(ClientConnectionDelegate$setClientID_N1479100880614063379.java)
       at org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:170)
       at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate$setClientID_N1479100880614063379.invokeNext(ClientConnectionDelegate$setClientID_N1479100880614063379.java)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate.setClientID(ClientConnectionDelegate.java)
       at org.jboss.jms.client.delegate.ClientConnectionDelegate.synchronizeWith(ClientConnectionDelegate.java:120)
       at org.jboss.jms.client.FailoverCommandCenter.failureDetected(FailoverCommandCenter.java:145)
       at org.jboss.jms.client.container.ConnectionFailureListener.handleConnectionException(ConnectionFailureListener.java:62)
       at org.jboss.jms.client.remoting.ConsolidatedRemotingConnectionListener.handleConnectionException(ConsolidatedRemotingConnectionListener.java:81)
       at org.jboss.remoting.ConnectionValidator$1.run(ConnectionValidator.java:452)
      
      


        • 1. Re: Failover failed
          chrismeadows

          I'm wondering why durable subscription failover is, in my experience, so flaky. An obvious reason would be that I've got something wrong, although I don't believe I have or otherwise I'd go and fix it (!). If I posted some zipped up code, would anyone in the JBM team be able to give it a sanity check to see if I've got my code/config wrong?

          If, on the other hand, I've found a JBM bug, does anyone know if there is any way I can mitigate against failover failures? I've tried various exception handling and disconnect/reconnect strategies with some success, but nothing reliable


          Chris

          • 2. Re: Failover failed
            timfox

            I'm not sure I understand your topology either.

            Can you explain it again, just to avoid ambiguity. Not sure what you meant by "receiving node".

            Is the queue clustered? Is the MDB on each node? Are you using persistent / non persistent messages etc?

            • 3. Re: Failover failed
              chrismeadows

              Hi Tim,

              First of all, I have a large portion of humble pie to eat - I hadn't clustered the SLSB that publishes. However, I'm still seeing unreliable results, so I'll elaborate on what I am doing...

              I have a pojo that connects via JNDI to queue/testQueue on the ClusteredConnectionFactory.

               private void startQueue() throws JMSException, NamingException {
               conn = ((ConnectionFactory) ctx.lookup("ClusteredConnectionFactory"))
               .createConnection("guest", "guest");
               queue = (Queue) ctx.lookup("queue/testQueue");
               session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
              
               conn.start();
               }
              

              It sends 1000 messages (say) of size 1Kb, that has an integer property 'TopicID' that is a random number from 1 to 50, representing the index of the topic that the message will be forwarded to inside JBoss.

              In my two node JBoss cluster, I have an EJB3 MDB that is listening to the queue configured thus:

              @Clustered
              @MessageDriven(
               messageListenerInterface=MessageListener.class,
              activationConfig = {
               @ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
               @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue")
              })
              public class OamServerPushImpl implements OamServerPush, MessageListener
              


              This MDB simply forwards onto a SLSB configured as follows, which publishes the message to the topic indicated by the TopicID property :

              @Clustered
              @Stateless
              public class PublisherImpl implements Publisher {
               @Resource(mappedName="ClusteredConnectionFactory")
               ConnectionFactory topicConnectionFactory;
              
               @Resource
               private SessionContext sc;
              
               Topic topic;
              
               public void redispatch( Message m, String sReceiverAddress, int iMessageID) {
               Session session = null;
               TextMessage message = null;
              
               try {
               int iTopicID = m.getIntProperty("TopicID");
               String sPayload = m.getStringProperty("Payload");
               connection = topicConnectionFactory.createConnection();
               session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
              // connection.stop();
               connection.start();
              
               message = session.createTextMessage();
              
               message.setText("Redispatching : " + sPayload);
               message.setStringProperty("ReceivedBy", sReceiverAddress);
               message.setStringProperty("PublishedBy", InetAddress.getLocalHost().getHostAddress());
               message.setIntProperty("MessageID", iMessageID);
               sendToClients(session, message, iTopicID);
               connection.close();
               } catch (Throwable t) {
               // JMSException could be thrown
               logger.warn(
               "PublisherBean.redispatch: " + "Exception: "
               + t.toString());
               sc.setRollbackOnly();
               } finally {
               if (session != null) {
               try {
               session.close();
               } catch (JMSException e) {
               }
               }
               }
               }
              
               public void sendToClients(Session session, Message message, int iTopicID)
               throws JMSException {
               topic = (Topic) sc.lookup("topic/testDurableTopic" + iTopicID);
               MessageProducer publisher = session.createProducer(topic);
               publisher.send(message);
               publisher.close();
               logger.info("Publisher BEAN: Message published: (" +
               ((TextMessage)message).getText().length() +") bytes");
               }
              



              I then have another 50 pojos outside the container each listening to one topic via a durable subscription

              public class MessageSubscriberClient implements Runnable {
               private static final Logger log = Logger.getLogger(MessageSubscriberClient.class);
              
               int iCount = 0;
               Connection conn;
               Session session;
               TopicSubscriber subscriber;
               Topic topic;
               Context ctx;
               int i;
               boolean bFinished = false;
              
               static int NUMBER_OF_MESSAGES = 1000;
              
               static final int NUMBER_OF_TOPICS = 50;
              
              
               public MessageSubscriberClient(Context ctx, int i) {
               this.i = i;
               this.ctx = ctx;
              
               try {
               startTopic();
               }
               catch (JMSException e1) {
               log.error(e1.getMessage(), e1);
               }
               catch (NamingException e) {
               log.error(e.getMessage(), e);
               }
              
               }
              
              
               public void run() {
               try {
               receive();
               } catch (JMSException e) {
               log.error(e.getMessage(), e);
               } catch (NamingException e) {
               log.error(e.getMessage(), e);
               }
               log.warn("Stopping topic on " + i);
               stopTopic();
               }
              
               public void receive() throws JMSException, NamingException {
               while ( ! bFinished) {
               try {
               if (subscriber != null) {
               process();
               }
               else {
               // try to recover
               log.info(i + " not well - subscriber null");
              // stopTopic();
              // startTopic();
               }
               } catch (JMSException e) {
               // try to recover
              // stopTopic();
              // startTopic();
               }
               }
               log.info("Finished receiving on " + i);
              
               }
              
              
               public int getNumReceivedMessages() {
               return iCount;
               }
              
               private void startTopic() throws JMSException, NamingException {
               conn = (Connection) ((ConnectionFactory) ctx
               .lookup("ClusteredConnectionFactory")).createConnection("john" + i, "needle");
              
               topic = (Topic) ctx.lookup("topic/testDurableTopic" + i);
               session = conn.createSession(false,
               TopicSession.AUTO_ACKNOWLEDGE);
              
               conn.stop();
              
               // subscription
               subscriber = session.createDurableSubscriber(topic, "finbar" + i);
              
               conn.start();
              
               }
              
               private void stopTopic() {
               log.info("In Stopping topic on " + i);
               try {
               if (subscriber != null) {
               subscriber.close();
               log.info("finbar" + i + " closed");
               }
               } catch (JMSException e) {
               log.error(e.getMessage(), e);
               }
               try {
               if (session != null) {
               session.unsubscribe("finbar" + i);
               log.info("finbar" + i + " unsubscribed");
               }
               } catch (JMSException e) {
               log.error(e.getMessage(), e);
               }
               try {
               if (conn != null)
               conn.close();
               } catch (JMSException e) {
               log.error(e.getMessage(), e);
               }
               }
              
               public void setFinished( boolean b) {
               bFinished = b;
               }
              
               public void process() throws JMSException {
               Message m = subscriber.receive(1000);
               TextMessage t = (TextMessage) m;
               if (t != null) {
               m.acknowledge();
               iCount++;
               }
               }
              }
              


              In the receive method above, I've indicated where I've tried to detect receiver problems and recover by stopping and stating the topic.

              I then run the receiver and sender code, wait until messages happily being sent and received, and then kill the JBoss node (kill -9) that is processing the messages.

              With the receiver code as-is above (not trying to recover), the threads just sit in the while loop reporting

              34864 [Thread-94] ERROR org.jboss.jms.client.container.ClosedInterceptor - ClosedInterceptor.ClientConsumerDelegate[e9-cs3ijw9f-1-64tmiw9f-c5iuxj-r55ss4]: method receive() did not go through, the interceptor is CLOSED
              javax.jms.IllegalStateException: The object is closed
               at org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:157)
               at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
               at org.jboss.jms.client.delegate.ClientConsumerDelegate$receive_N8299950230150603585.invokeNext(ClientConsumerDelegate$receive_N8299950230150603585.java)
               at org.jboss.jms.client.delegate.ClientConsumerDelegate.receive(ClientConsumerDelegate.java)
               at org.jboss.jms.client.JBossMessageConsumer.receive(JBossMessageConsumer.java:86)
               at com.ipaccess.MessageSubscriberClient.process(MessageSubscriberClient.java:154)
               at com.ipaccess.MessageSubscriberClient.receive(MessageSubscriberClient.java:78)
               at com.ipaccess.MessageSubscriberClient.run(MessageSubscriberClient.java:64)
               at java.lang.Thread.run(Thread.java:595)
              


              If on the other hand I try to recover the topic by stopping and starting it, I get much 'better' results (I sometimes receive all messages), but more llikely I find that some messages get stuck in JBoss. The receivers will sit there listening whilst JBoss client side code reports the failover (with a lot of jboss remoting logging that I realise is an outstanding JIRA task)

              294287 [Timer-0] WARN org.jboss.remoting.LeasePinger - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed to ping to server: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize
              =200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&
              dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOf
              CallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&
              socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
              294584 [Thread-277] WARN org.jboss.remoting.LeasePinger - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed sending disconnect for client lease for client with session ID 4ss15g-cvnwhk-f9wjxoka-1-f9wjxqil-1t
              


              and

              - unable to get secondary locator
              org.jboss.remoting.CannotConnectException: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.
              jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&
              numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_
              connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
               at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:532)
               at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:413)
               at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
               at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.getSecondaryLocator(BisocketClientInvoker.java:538)
               at org.jboss.remoting.transport.bisocket.BisocketServerInvoker.createControlConnection(BisocketServerInvoker.java:228)
               at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:402)
               at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
               at org.jboss.remoting.Client.invoke(Client.java:1634)
               at org.jboss.remoting.Client.addCallbackListener(Client.java:1703)
               at org.jboss.remoting.Client.addListener(Client.java:921)
               at org.jboss.jms.client.remoting.JMSRemotingConnection.addInvokerCallbackHandler(JMSRemotingConnection.java:237)
               at org.jboss.jms.client.remoting.JMSRemotingConnection.start(JMSRemotingConnection.java:312)
               at org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate.establishCallback(ClientClusteredConnectionFactoryDelegate.java:99)
               at org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler$CallbackConnectionListener.handleConnectionException(ConnectionFactoryCallbackHandler.java:105)
               at org.jboss.remoting.ConnectionValidator$1.run(ConnectionValidator.java:452)
              Caused by: java.net.ConnectException: Connection refused: connect
               at java.net.PlainSocketImpl.socketConnect(Native Method)
               at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
               at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195)
               at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
               at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
               at java.net.Socket.connect(Socket.java:519)
               at org.jboss.remoting.transport.socket.SocketClientInvoker.createSocket(SocketClientInvoker.java:187)
               at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.createSocket(BisocketClientInvoker.java:420)
               at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.getConnection(MicroSocketClientInvoker.java:815)
               at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:525)
               ... 14 more
              


              eventually (over an hour later) , most receivers reconnect, but some never do. I see

              1562035 [Thread-313] ERROR org.jboss.remoting.MicroRemoteClientInvoker - error shutting down lease pinger
              


              and then it all goes quiet - no more logging. Despite leaving the sender/receivers and one JBoss node up for well over and hour, no more messages get through.

              I wasn't expecting to have to stop and start the topics on the receivers, but it looks like I have to. But even if I do, it is probable that some receivers do not failover.


              Other info...

              I execute sender and receivers from JUnit, but I think putting it in this post is unnecessary

              Inside JBoss, the destinations are configured like so

              
               <mbean code="org.jboss.jms.server.destination.QueueService"
               name="jboss.messaging.destination:service=Queue,name=testQueue"
               xmbean-dd="xmdesc/Queue-xmbean.xml">
               <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
               <depends>jboss.messaging:service=PostOffice</depends>
               <attribute name="SecurityConfig">
               <attribute name="Clustered">true</attribute>
               <security>
               <role name="guest" read="true" write="true"/>
               <role name="publisher" read="true" write="true" create="false"/>
               <role name="noacc" read="false" write="false" create="false"/>
               </security>
               </attribute>
               </mbean>
              
               <!-- Repeat for testDurableTopic0 to testDurableTopic49 -->
               <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=testDurableTopic0"
               xmbean-dd="xmdesc/Topic-xmbean.xml">
               <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
               <depends>jboss.messaging:service=PostOffice</depends>
               <attribute name="Clustered">true</attribute>
               <attribute name="SecurityConfig">
               <security>
               <role name="guest" read="true" write="true"/>
               <role name="publisher" read="true" write="true" create="false"/>
               <role name="durpublisher" read="true" write="true" create="true"/>
               </security>
               </attribute>
               </mbean>
              


              I have a MySQl-persistence-service.xml with the appropriate clustered attribute and SQL to set up the users, passwords, roles and ClientIDs

              <attribute name="Clustered">true</attribute>
              ...
              POPULATE.TABLES.69 = INSERT INTO JBM_USER (USER_ID,PASSWD,CLIENTID) VALUES ('john49', 'needle', 'DurableSubscriberExample49')
              POPULATE.TABLES.119 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('durpublisher','john49')
              




              and I set ServerPeerID to 0 and 1 respectively in messaging-service.xml for the two nodes.

              JBoss is running on RHEL 4 JRE 1.5.0_12
              JBoss 4.2.2 GA with JBM 1.4.0.SP1 configured as per documentation.
              Pojo sender and receiver are running on WinXP JRE 1.5.0_12.



              • 4. Re: Failover failed
                chrismeadows

                Just noticed SP2 released - I'll give it a whirl. In particular, the fix for
                http://jira.jboss.com/jira/browse/JBMESSAGING-1184 look promising

                • 5. Re: Failover failed
                  timfox

                  One observation:

                  You're creating a connection, session, and producer for every message that you forward!

                  This will be extremely slow...

                  Connection and sessions are heavyweight objects that you want to create once then re-use. Really you want to re-use the producer too.

                  • 6. Re: Failover failed
                    timfox

                    If you're in a managed environment you can use the JCA JMS resource adapter to cache connections:

                    See http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossJMSRA

                    • 7. Re: Failover failed
                      chrismeadows

                      SP2 + the patched jboss remoting gives much better, complete results. I receive duplicates, but I'm sure the clients can handle that if they're aware that it can happen. (For anyone lese reading this, see the Clustering Notes at http://labs.jboss.com/file-access/default/members/jbossmessaging/freezone/docs/userguide-1.4.0.SP1/html/c_configuration.html)

                      Thanks very much for your time.

                      Incidentally,


                      You're creating a connection, session, and producer for every message that you forward!

                      This will be extremely slow...

                      Yes that was deliberate, I had copious logging going on that I stripped out of the code I posted.


                      If you're in a managed environment you can use the JCA JMS resource adapter to cache connections:

                      See http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossJMSRA


                      I'll give that a good look.

                      • 8. Re: Failover failed
                        timfox

                        Glad that works for you :)

                        If you want to be sure you don't receive duplicates just make sure your consumers use a transacted session to consume - then you should get guaranteed once and only once delivery as per spec.

                        Currently you're using a non transacted session so losing messages / receiving dups on failover would be expected.