Standard producer, STOMP consumer - IOOBE
chris_overseas Jan 13, 2011 1:12 PMI'm just starting to play with HornetQ and have hit a problem when using a STOMP consumer. Note that I'm quite possibly doing something silly since I'm new to the API, but I can't figure out what. Also note that I'm using a trunk build from today, since I've heard it contains several STOMP bugfixes that aren't in 2.1.2.
I have a producer writing messages that contain just a byte[] to a queue. If I have a normal consumer, it can pick up and read the messages just fine. If I try using a STOMP consumer instead though, HornetQ throws an IndexOutOfBoundsException, triggered because StompSession.java line 101 doesn't detect a BYTE_TYPE message, so tries to parse the byte[] as UTF-8.
Here's some example code that demonstrates the problem. Any hints as to what I'm doing wrong would be appreciated!
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.JournalType;
public class StompTest
{
private static final String END_OF_FRAME = "\u0000" ;
public static void main(String[] args) throws Exception
{
Configuration config = new ConfigurationImpl();
config.setJournalType(JournalType.NIO);
config.setSecurityEnabled(false);
config.setPersistenceEnabled(false);
Map<String, Object> params = new HashMap<String, Object>();
config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
params = new HashMap<String, Object>();
params.put(TransportConstants.PORT_PROP_NAME, 61613);
params.put(TransportConstants.PROTOCOL_PROP_NAME, "stomp");
config.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
config.getQueueConfigurations().add(new CoreQueueConfiguration("example", "myQueue", null, true));
HornetQServer server = HornetQServers.newHornetQServer(config);
System.out.println("**** Starting server");
server.start();
TransportConfiguration transportConfig = new TransportConfiguration(NettyConnectorFactory.class.getName());
ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfig);
ClientSessionFactory factory = serverLocator.createSessionFactory(transportConfig);
ClientSession session = factory.createSession();
System.out.println("**** Connecting STOMP client");
Socket socket = new Socket("localhost", 61613);
String connectFrame = "CONNECT\nlogin:\npasscode:\nrequest-id: 1\n\n" + END_OF_FRAME;
sendFrame(socket, connectFrame);
receiveFrame(socket);
ClientProducer producer = session.createProducer("example");
ClientMessage msg = session.createMessage(true);
byte[] data = new byte[] {1, 2, 3};
msg.getBodyBuffer().writeBytes(data);
System.out.println("**** Sending message");
producer.send(msg);
System.out.println("**** Starting session");
session.start();
System.out.println("**** Subscribing to myQueue");
String message = "SUBSCRIBE\ndestination: myQueue\nreceipt:message-12345\nack:client\n\n" + END_OF_FRAME;
sendFrame(socket, message);
receiveFrame(socket);
System.out.println("**** Sleeping");
Thread.sleep(1000L);
System.out.println("**** Listening for message");
receiveFrame(socket);
System.out.println("**** Disconnecting...");
String disconnectFrame = "DISCONNECT\n\n" + END_OF_FRAME; sendFrame(socket, disconnectFrame);
receiveFrame(socket);
}
private static void sendFrame(Socket socket, String data) throws Exception
{
byte[] bytes = data.getBytes("UTF-8");
OutputStream outputStream = socket.getOutputStream();
for (int i = 0; i < bytes.length; i++)
{
outputStream.write(bytes[i]);
}
outputStream.flush();
}
private static void receiveFrame(Socket socket) throws Exception
{
InputStream is = socket.getInputStream();
byte[] buf = new byte[500];
int length = is.read(buf);
if (length < 0)
{
System.out.println("**** EOF");
}
else
{
System.out.println("**** Received:");
String str = new String(buf, 0, length, "UTF-8");
System.out.println(str);
}
}
}