2 Replies Latest reply on Jun 3, 2012 6:16 PM by Justin Bertram

    Jboss As 7 jms client send/receive message by queue

    Victor Chen Newbie

      Hello~

      I wrote a client code about jms by queue, when I used this code in Jboss-4.2.3.GA and it can run.

      But I used in Jboss As 7.1.1.final, then it has some problem.

      I want to create more thread to send and reveive.

      I set up Send[] tt = new Send[1] in run_thread1(), then it can run normal.

      But when I set up Send[] tt = new Send[2] or i more than 2, then it will show error message to me.

      I try to debug it and find any possible solutions, but I still have no idea how to do, so, maybe someone can help me or give some suggests, or someone maybe did it that can show me your simple code.

      The error message will after my simple code.

      Thank for your help.

       

       

       

      here is my client code:

       

      import java.util.Hashtable;

       

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageListener;

      import javax.jms.Queue;

      import javax.jms.QueueConnection;

      import javax.jms.QueueConnectionFactory;

      import javax.jms.QueueReceiver;

      import javax.jms.QueueSender;

      import javax.jms.QueueSession;

      import javax.jms.TextMessage;

      import javax.jms.MapMessage;

      import javax.naming.InitialContext;

      import javax.naming.NamingException;

      import javax.naming.Context;

      import javax.jms.DeliveryMode;

      import java.util.Date;

       

      public class SendRecvClient

      {  

          private final static String JNDI_FACTORY = "org.jboss.naming.remote.client.InitialContextFactory";

          public final static String JMS_FACTORY="jms/RemoteConnectionFactory";

        

          QueueConnection qconn = null;

          QueueSession qsession = null;

          QueueSender qsend = null;

          QueueReceiver qrecv = null;

          Queue queue = null;

       

          TextMessage Sendmessage = null;

          String text = new String(" A text msg a;lsdoinyweoiuvtweryuvewvcygwelivrewv");

        

          public static class ExListener implements MessageListener

          {

              public void onMessage(Message msg)

              {

                  TextMessage Recvmessage = (TextMessage) msg;

                  try

                  {

                      System.out.println("onMessage, Recv text = " + Recvmessage.getText());

                  }

                  catch(Throwable t)

                  {

                      t.printStackTrace();

                  }

              }

          }

       

          public void setupPTP() throws JMSException, NamingException

          {

            

              Hashtable<String, String> env = new Hashtable();

              env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);

            

              env.put(Context.PROVIDER_URL, "remote://localhost:4447");

              env.put(Context.SECURITY_PRINCIPAL, "testuser");

              env.put(Context.SECURITY_CREDENTIALS, "testpassword");

            

              InitialContext iniCtx = new InitialContext(env);

            

              Object tmp = iniCtx.lookup("jms/RemoteConnectionFactory");

              QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;

       

              qconn = qcf.createQueueConnection();

              queue = (Queue) iniCtx.lookup("jms/queue/test");

        

              qsession = qconn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);

            

          }

       

          public void sendRecvAsync() throws JMSException, NamingException

          {

              System.out.println("Begin sendRecvAsync...");

            

              // Set up the PTP connection, session

              setupPTP();

            

              // Set the async listener

              qrecv = qsession.createReceiver(queue);

              recv.setMessageListener(new ExListener());

            

              // Send a text msg

              qsend = qsession.createSender(queue);

              Sendmessage = qsession.createTextMessage();

       

          }

        

          class Send extends Thread

          {

              public void run()

              {         

                  try

                  {

                      for(int i = 0; i < 10; i++)

                      {  

                          Sendmessage.setText( text + " # " + i);

                          qsend.send(queue, Sendmessage);  

                          System.out.println("Sent text = " + Sendmessage.getText());

                      }

        

                  }

                  catch(JMSException e)

                  {

                      System.out.print("Exception occurred:" + e.toString());

                  }

              }

          }

        

          class Recv extends Thread

          {

              public void run()

              {

                  try

                  {

                      qrecv.setMessageListener(new ExListener());

                      qconn.start();

                    

                  }

                  catch(JMSException e)

                  {

                    

                  }

              }

          }

         

          public void run_thread1()

          {

              Send[] tt = new Send[1];

            

              for(int i = 0; i < tt.length; i++)

              {

                  tt[i] = new Send();

                  tt[i].start();

                

              }

            

              for(int i = 0; i < tt.length; i++)

              {

                  try

                  {

                      tt[i].join();

                  }

                  catch(InterruptedException je)

                  {

                      je.printStackTrace();

                  }

              }

          }

        

          public void run_thread2()

          {

              Recv[] kk = new Recv[10000];

              for(int i = 0; i < kk.length; i++)

              {

                  kk[i] = new Recv();

                  kk[i].start();

              }

          }

       

          public void close() throws JMSException

          {

              qconn.close();

              qsession.close();

              qconn.close();

          }

        

          public static void main(String args[]) throws Exception, JMSException,NamingException

          {  

              System.out.printf("Begin SendRecvClient,now =" + " %tF %<tT\n", new Date());

              SendRecvClient client = new SendRecvClient();  

        

              client.sendRecvAsync();

              client.run_thread1();  

              client.run_thread2();

              client.close();

              System.out.println("End SendRecvClient...");  

              System.out.printf("End SendRecvClient,now =" + " %tF %<tT\n", new Date());

          } 

        

      }

       

      =====================================================================================================================================

       

      When I set up  Send[] tt = new Send[1] in run_thread1(), i = 2, then sometimes will show this error message.

       

      org.hornetq.core.logging.impl.JULLogDelegate error

      Failed to prepare message for receipt

      java.lang.IndexOutOfBoundsException

                at org.jboss.netty.buffer.AbstractChannelBuffer.checkReadableBytes(AbstractChannelBuffer.java:657)

                at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:337)

                at org.jboss.netty.buffer.AbstractChannelBuffer.readBytes(AbstractChannelBuffer.java:343)

                at org.hornetq.core.buffers.impl.ChannelBufferWrapper.readSimpleStringInternal(ChannelBufferWrapper.java:84)

                at org.hornetq.core.buffers.impl.ChannelBufferWrapper.readNullableSimpleString(ChannelBufferWrapper.java:58)

                at org.hornetq.jms.client.HornetQTextMessage.doBeforeReceive(HornetQTextMessage.java:143)

                at org.hornetq.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:68)

                at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:983)

                at org.hornetq.core.client.impl.ClientConsumerImpl.access$400(ClientConsumerImpl.java:48)

                at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1113)

                at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:100)

                at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

                at java.lang.Thread.run(Unknown Source)

       

      =========================================================================================================================================

       

      When I set up i = 3 or more than 3, then I got this message.

       

      You don't have an address at the Server's SessionException occurred:javax.jms.IllegalStateException:

       

      =========================================================================================================================================

       

      But when I set up i = 1, and it can run normal.

      So, I really don't know what something happen.

       

      Thank you.