3 Replies Latest reply on Sep 5, 2008 2:18 PM by Clebert Suconic

    Problem with Asynchronous Communicatin

    kumar reddy Newbie

      Hi All,

      I have Jboss-4.2.1 GA and installed jboss-messaging-1.3.0.GA.
      I have QueueProducer which sends message to Queue and QueueConsumer which recieves message from the same Queue.

      When i run QueueProducer class it is sending the message successfully but when i run QueueConsumer class it is throwing exception on the server console.

      Below is the stackTrace:

      17:57:58,413 WARN [BisocketClientInvoker] Unable to send ping: shutting down Pi
      ngTimerTask
      java.net.SocketException: Connection reset by peer: socket write error
       at java.net.SocketOutputStream.socketWrite0(Native Method)
       at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
       at java.net.SocketOutputStream.write(SocketOutputStream.java:115)
       at org.jboss.remoting.transport.bisocket.BisocketClientInvoker$PingTimer
      Task.run(BisocketClientInvoker.java:636)
       at java.util.TimerThread.mainLoop(Timer.java:512)
       at java.util.TimerThread.run(Timer.java:462)
      17:58:33,279 WARN [SimpleConnectionManager] ConnectionManager[18a5d49] cannot l
      ook up remoting session ID a2g5c2o-skwq6f-fkpct151-1-fkpct1pw-4
      17:58:33,279 WARN [SimpleConnectionManager] A problem has been detected with th
      e connection to remote client a2g5c2o-skwq6f-fkpct151-1-fkpct1pw-4. It is possib
      le the client has exited without closing its connection(s) or there is a network
       problem. All connection resources corresponding to that client process will now
       be removed.


      Following is the producer class:
      public class QueueProducer {
      
       /**
       * @param args
       */
       public static void main(String[] args) {
       try {
      
      
       Properties prop=new Properties();
       prop.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
       prop.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
       prop.setProperty("java.naming.provider.url","jnp://localhost:1100");
      
       InitialContext ic =new InitialContext(prop);
      
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
       Queue queue = (Queue)ic.lookup("queue/testQueue");
      
       Connection connection = cf.createConnection();
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       MessageProducer sender = session.createProducer(queue);
      
       TextMessage message = session.createTextMessage("pavan");
       sender.send(message);
       connection.close();
       System.out.println("message sent");
      
       } catch (Exception e) {
      
       e.printStackTrace();
      
       }
      
       }
      
      }


      Following is the QueueConsumer class
      public class QueueConsumer implements MessageListener{
      
       private static Connection conn=null;
      
       /**
       * @param args
       */
       public static void main(String[] args) {
       try{
       Properties prop=new Properties();
       prop.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
       prop.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
       prop.setProperty("java.naming.provider.url","jnp://localhost:1100");
      
       InitialContext context =new InitialContext(prop);
      
       ConnectionFactory connectionFactory=(ConnectionFactory)context.lookup("ConnectionFactory");
       conn=connectionFactory.createConnection();
       Session session=conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
       Queue queue=(Queue)context.lookup("queue/testQueue");
       MessageConsumer consumer=session.createConsumer(queue);
      
       consumer.setMessageListener(new QueueConsumer());
      
       conn.start();
      
       }
       catch (Exception e) {
       e.printStackTrace();
       }finally{
       try{
       //conn.close();
       }catch(Exception e){
       e.printStackTrace();
       }
       }
      
       }
      
       public void onMessage(Message message)
       {
       try {
       System.out.println("inside onmessage");
       if(message instanceof TextMessage){
       TextMessage objmsg=(TextMessage)message;
       String msg=objmsg.getText();
       System.out.println("message by text : "+msg);
      
       }
       if(message instanceof ObjectMessage){
       ObjectMessage objmsg=(ObjectMessage)message;
       String msg=((Object)objmsg.getObject()).toString();
       System.out.println("message by Object : "+msg);
       }
      
       } catch (Exception e) {
       e.printStackTrace();
       }
      
      
       }
      
      }


      After trying out various ways i could resolve this problem by uncommenting the code which is there in QueueConsumer class.
      Whenever i comment the code it gives me the same exception.
      finally{
       try{
       conn.close();
       }catch(Exception e){
       e.printStackTrace();
       }
       }
      


      Then i feel that the messages are not recieving asynchronously.
      whenever i run QueueProducer i should always run QueueConsumer to get the messages.

      In JbossMQ it is not necessary to always run the consumer class because the connection is open to the Queue and we are able to recieve messages whenever we run Producer.

      Isn't the behaviour for asynchronous communication?

      Please help me.

        • 1. Re: Problem with Asynchronous Communicatin
          Clebert Suconic Master

           

          java.net.SocketException: Connection reset by peer: socket write error
           at java.net.SocketOutputStream.socketWrite0(Native Method)
           at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
           at java.net.SocketOutputStream.write(SocketOutputStream.java:115)
           at org.jboss.remoting.transport.bisocket.BisocketClientInvoker$PingTimer
          Task.run(BisocketClientInvoker.java:636)
           at java.util.TimerThread.mainLoop(Timer.java:512)
           at java.util.TimerThread.run(Timer.java:462)
          


          This is just the server trying to ping your client.

          When you have a connection opened, the server will ping the client from time to time. As you're not closing the connection you're getting this error.

          If you close the connection (which is the expected behavior) you won't have any ping failing under regular circumstances.

          • 2. Re: Problem with Asynchronous Communicatin
            kumar reddy Newbie

            Hi,

            If i want the connection to be open and listen continuously for messages without closing the connection.what needs to be done?

            Because the same program without closing the connection works for JbossMQ and listens continuously for messages.

            Please help me.

            • 3. Re: Problem with Asynchronous Communicatin
              Clebert Suconic Master

              You just keep your VM running as long as you have a connection opened.

              Your problem was keeping the connection opened, while the VM was gone.

              The server tried to hit your client back, but you killed your client.