1 Reply Latest reply on Jan 14, 2011 5:19 AM by Chris Miller

    Standard producer, STOMP consumer - IOOBE

    Chris Miller Newbie

      I'm just starting to play with HornetQ and have hit a problem when using a STOMP consumer. Note that I'm quite possibly doing something silly since I'm new to the API, but I can't figure out what. Also note that I'm using a trunk build from today, since I've heard it contains several STOMP bugfixes that aren't in 2.1.2.


      I have a producer writing messages that contain just a byte[] to a queue. If I have a normal consumer, it can pick up and read the messages just fine. If I try using a STOMP consumer instead though, HornetQ throws an IndexOutOfBoundsException, triggered because StompSession.java line 101 doesn't detect a BYTE_TYPE message, so tries to parse the byte[] as UTF-8.


      Here's some example code that demonstrates the problem. Any hints as to what I'm doing wrong would be appreciated!


      import java.io.InputStream;

      import java.io.OutputStream;

      import java.net.Socket;

      import java.util.HashMap;

      import java.util.Map;


      import org.hornetq.api.core.TransportConfiguration;

      import org.hornetq.api.core.client.ClientMessage;

      import org.hornetq.api.core.client.ClientProducer;

      import org.hornetq.api.core.client.ClientSession;

      import org.hornetq.api.core.client.ClientSessionFactory;

      import org.hornetq.api.core.client.HornetQClient;

      import org.hornetq.api.core.client.ServerLocator;

      import org.hornetq.core.config.Configuration;

      import org.hornetq.core.config.CoreQueueConfiguration;

      import org.hornetq.core.config.impl.ConfigurationImpl;

      import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;

      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

      import org.hornetq.core.remoting.impl.netty.TransportConstants;

      import org.hornetq.core.server.HornetQServer;

      import org.hornetq.core.server.HornetQServers;

      import org.hornetq.core.server.JournalType;


      public class StompTest


        private static final String END_OF_FRAME = "\u0000" ;


        public static void main(String[] args) throws Exception


          Configuration config = new ConfigurationImpl();





          Map<String, Object> params = new HashMap<String, Object>();

          config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));


          params = new HashMap<String, Object>();

          params.put(TransportConstants.PORT_PROP_NAME, 61613);

          params.put(TransportConstants.PROTOCOL_PROP_NAME, "stomp");

          config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));


          config.getQueueConfigurations().add(new CoreQueueConfiguration("example", "myQueue", null, true));


          HornetQServer server = HornetQServers.newHornetQServer(config);

          System.out.println("**** Starting server");



          TransportConfiguration transportConfig = new TransportConfiguration(NettyConnectorFactory.class.getName());

          ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfig);

          ClientSessionFactory factory = serverLocator.createSessionFactory(transportConfig);

          ClientSession session = factory.createSession();


          System.out.println("**** Connecting STOMP client");

          Socket socket = new Socket("localhost", 61613);

          String connectFrame = "CONNECT\nlogin:\npasscode:\nrequest-id: 1\n\n" + END_OF_FRAME;

          sendFrame(socket, connectFrame);



          ClientProducer producer = session.createProducer("example");


          ClientMessage msg = session.createMessage(true);

          byte[] data = new byte[] {1, 2, 3};


          System.out.println("**** Sending message");



          System.out.println("**** Starting session");



          System.out.println("**** Subscribing to myQueue");

          String message = "SUBSCRIBE\ndestination: myQueue\nreceipt:message-12345\nack:client\n\n" + END_OF_FRAME;

          sendFrame(socket, message);



          System.out.println("**** Sleeping");


          System.out.println("**** Listening for message");



          System.out.println("**** Disconnecting...");

          String disconnectFrame = "DISCONNECT\n\n" + END_OF_FRAME; sendFrame(socket, disconnectFrame);




        private static void sendFrame(Socket socket, String data) throws Exception


          byte[] bytes = data.getBytes("UTF-8");

          OutputStream outputStream = socket.getOutputStream();

          for (int i = 0; i < bytes.length; i++)







        private static void receiveFrame(Socket socket) throws Exception


          InputStream is = socket.getInputStream();

          byte[] buf = new byte[500];

          int length = is.read(buf);

          if (length < 0)


            System.out.println("**** EOF");




            System.out.println("**** Received:");

            String str = new String(buf, 0, length, "UTF-8");





        • 1. Standard producer, STOMP consumer - IOOBE
          Chris Miller Newbie

          In answer to my own question, it looks like the trick I was missing was this:


          ClientMessage msg = session.createMessage(Message.BYTES_TYPE, true);


          ie, I wasn't setting the message type explicitly. What caught me out was that this didn't seem to be required for a standard producer/consumer - it wasn't until I brought STOMP into the mix that problems arose. There doesn't seem to be a lot of documentation in this area. Perhaps some documentation could be added explaining how the message type is used and under what circumstances it matters?