1 Reply Latest reply on Jan 14, 2011 5:19 AM by chris_overseas

    Standard producer, STOMP consumer - IOOBE

    chris_overseas

      I'm just starting to play with HornetQ and have hit a problem when using a STOMP consumer. Note that I'm quite possibly doing something silly since I'm new to the API, but I can't figure out what. Also note that I'm using a trunk build from today, since I've heard it contains several STOMP bugfixes that aren't in 2.1.2.

       

      I have a producer writing messages that contain just a byte[] to a queue. If I have a normal consumer, it can pick up and read the messages just fine. If I try using a STOMP consumer instead though, HornetQ throws an IndexOutOfBoundsException, triggered because StompSession.java line 101 doesn't detect a BYTE_TYPE message, so tries to parse the byte[] as UTF-8.

       

      Here's some example code that demonstrates the problem. Any hints as to what I'm doing wrong would be appreciated!

       

      import java.io.InputStream;

      import java.io.OutputStream;

      import java.net.Socket;

      import java.util.HashMap;

      import java.util.Map;

       

      import org.hornetq.api.core.TransportConfiguration;

      import org.hornetq.api.core.client.ClientMessage;

      import org.hornetq.api.core.client.ClientProducer;

      import org.hornetq.api.core.client.ClientSession;

      import org.hornetq.api.core.client.ClientSessionFactory;

      import org.hornetq.api.core.client.HornetQClient;

      import org.hornetq.api.core.client.ServerLocator;

      import org.hornetq.core.config.Configuration;

      import org.hornetq.core.config.CoreQueueConfiguration;

      import org.hornetq.core.config.impl.ConfigurationImpl;

      import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;

      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

      import org.hornetq.core.remoting.impl.netty.TransportConstants;

      import org.hornetq.core.server.HornetQServer;

      import org.hornetq.core.server.HornetQServers;

      import org.hornetq.core.server.JournalType;

       

      public class StompTest

      {

        private static final String END_OF_FRAME = "\u0000" ;

       

        public static void main(String[] args) throws Exception

        {

          Configuration config = new ConfigurationImpl();

          config.setJournalType(JournalType.NIO);

          config.setSecurityEnabled(false);

          config.setPersistenceEnabled(false);

       

          Map<String, Object> params = new HashMap<String, Object>();

          config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));

       

          params = new HashMap<String, Object>();

          params.put(TransportConstants.PORT_PROP_NAME, 61613);

          params.put(TransportConstants.PROTOCOL_PROP_NAME, "stomp");

          config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));

       

          config.getQueueConfigurations().add(new CoreQueueConfiguration("example", "myQueue", null, true));

       

          HornetQServer server = HornetQServers.newHornetQServer(config);

          System.out.println("**** Starting server");

          server.start();

       

          TransportConfiguration transportConfig = new TransportConfiguration(NettyConnectorFactory.class.getName());

          ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfig);

          ClientSessionFactory factory = serverLocator.createSessionFactory(transportConfig);

          ClientSession session = factory.createSession();

       

          System.out.println("**** Connecting STOMP client");

          Socket socket = new Socket("localhost", 61613);

          String connectFrame = "CONNECT\nlogin:\npasscode:\nrequest-id: 1\n\n" + END_OF_FRAME;

          sendFrame(socket, connectFrame);

          receiveFrame(socket);

       

          ClientProducer producer = session.createProducer("example");

       

          ClientMessage msg = session.createMessage(true);

          byte[] data = new byte[] {1, 2, 3};

          msg.getBodyBuffer().writeBytes(data);

          System.out.println("**** Sending message");

          producer.send(msg);

       

          System.out.println("**** Starting session");

          session.start();

       

          System.out.println("**** Subscribing to myQueue");

          String message = "SUBSCRIBE\ndestination: myQueue\nreceipt:message-12345\nack:client\n\n" + END_OF_FRAME;

          sendFrame(socket, message);

          receiveFrame(socket);

       

          System.out.println("**** Sleeping");

          Thread.sleep(1000L);

          System.out.println("**** Listening for message");

          receiveFrame(socket);

       

          System.out.println("**** Disconnecting...");

          String disconnectFrame = "DISCONNECT\n\n" + END_OF_FRAME; sendFrame(socket, disconnectFrame);

          receiveFrame(socket);

        }

       

        private static void sendFrame(Socket socket, String data) throws Exception

        {

          byte[] bytes = data.getBytes("UTF-8");

          OutputStream outputStream = socket.getOutputStream();

          for (int i = 0; i < bytes.length; i++)

          {

            outputStream.write(bytes[i]);

          }

          outputStream.flush();

        }

       

        private static void receiveFrame(Socket socket) throws Exception

        {

          InputStream is = socket.getInputStream();

          byte[] buf = new byte[500];

          int length = is.read(buf);

          if (length < 0)

          {

            System.out.println("**** EOF");

          }

          else

          {

            System.out.println("**** Received:");

            String str = new String(buf, 0, length, "UTF-8");

            System.out.println(str);

          }

        }

      }

        • 1. Standard producer, STOMP consumer - IOOBE
          chris_overseas

          In answer to my own question, it looks like the trick I was missing was this:

           

          ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);

           

          ie, I wasn't setting the message type explicitly. What caught me out was that this didn't seem to be required for a standard producer/consumer - it wasn't until I brought STOMP into the mix that problems arose. There doesn't seem to be a lot of documentation in this area. Perhaps some documentation could be added explaining how the message type is used and under what circumstances it matters?