-
1. Re: Hornetq with core bridge delivers null when message over 45960 bytes
lorrymoller Dec 14, 2011 4:00 PM (in response to lorrymoller)ps - From my C++ app's perspective it is sending the data properly (I print it out just as it goes out the door), and can send subsequent (smaller) messages thereafter with no problems
-
2. Re: Hornetq with core bridge delivers null when message over 45960 bytes
gaohoward Dec 14, 2011 9:59 PM (in response to lorrymoller)Did you try receiving the message from server A (temporarily disable the bridge) and see if the message is null or not?
Howard
-
3. Re: Hornetq with core bridge delivers null when message over 45960 bytes
lorrymoller Dec 14, 2011 11:41 PM (in response to gaohoward)Thx, clever test!
Just tried it, if I add a stomp acceptor to the Hornetq B configuration, and have my C++ client send the stomp request directly to HornetQ B and its queue, then I get message in my jboss consumer properly (regardless of the message size).
I think this indicates a bug/misconfiguration in core bridge? I don't see any settings in the core bridge that would affect this so it seems like a bug to me; any thoughts on settings to try?
The only thing I can think of is to use a JMS bridge rather than a core bridge, but the docs in chapter 36 recommend to use core bridge, I'm not sure the reasons not a jms bridge.
Thanks for any advice you can give.
Lorry
ps - I'm not sure C++ to hornetB was what you meant, but I think (?) I need hornetq B embedded in my jboss as a jms provider in order to accept the message. if not, where do i configure settings so that hornetq A can talk to jboss on another machine?
-
4. Re: Hornetq with core bridge delivers null when message over 45960 bytes
gaohoward Dec 15, 2011 12:13 AM (in response to lorrymoller)That's what I meant. even though it still helps to find the problem. My understanding of your applications structure is like:
c++ stomp client ----> address X on HornetQ A standalone ----> (bridge) ---> address Y on HornetQ B in JBOSS --> your message listener
Now your message listener receives null. Now your test proves that HornetQ B may not be the problem. My guess is that the message the bridge sends to Y is already null. But I'm not sure whether it is that the bridge received a good message and (due to some bug) sent a null to Hornet B, or the bridge received a null message and sent to hornetQ B. So what I suggested is using a normal hornetq client to receive directly from HornetQ A server, to test whether HornetQ A delivers a good message or a null message. So we can be further sure about if the bridge or the hornetQ A server malfunctioned.
About 'core bridge' vs 'jms bridge', i think core bridge has more config options and better performance.
-
5. Re: Hornetq with core bridge delivers null when message over 45960 bytes
lorrymoller Dec 15, 2011 1:07 AM (in response to gaohoward)Okay, thanks, and yes, that is my topology. I think you are suggesting
c++ stomp client ----> address X on HornetQ A standalone ----> new hornetq consumer client
(That looks pretty similar to what I tested:
c++ stomp client ----> address Y on HornetQ B in JBOSS --> my message listener
but i guess that does give some more info)
I'm open to any suggestions for what to use for the hornetq consumer client, otherwise I'll create one by modifying the ConsumerRateLimit Example tomorrow.
Thx!
lorry
-
6. Re: Hornetq with core bridge delivers null when message over 45960 bytes
lorrymoller Dec 19, 2011 8:50 AM (in response to lorrymoller)Did not get to the test program yet, just adding the following in case it helps anyone else:
I tried adding a 'content-length' field to the stomp message so the core bridge would use ByteMessage rather than TextMessage, and that works fine for my 46K file, so that was encouraging.
However, when I try a 139 k file, it fails spectaularly - exception in hornetq that keeps repeating in a tight loop, before it even calls my onMessage handler. pretty scary, as it fills my jboss log pretty quickly and the only way I can stop it (even after a restart) is to kill my large message/journal.
Going to try jms bridge
C++ client, where 'jsonmsg' is the text file I read:
----------------------------
std::string jsonmsg = msg.GetJsonEncoding();
std::stringstream ss;
ss<<jsonmsg.length();
std::string sendcmd ="SEND\ndestination: ";
sendcmd += m_destination;
sendcmd += "\npersistent: true\ncontent-length: " + ss.str() + "\n\n" + jsonmsg + "\0"; //tried it with and without the terminating null character, same result
Repeatinng error in jboss:
08:35:02,677 ERROR [HornetQMessageHandler] Failed to deliver message
java.lang.NumberFormatException
: null
at java.lang.Long.parseLong(
Long.java:375
)
at java.lang.Long.valueOf(
Long.java:525
)
at org.hornetq.utils.TypedProperties.getLongProperty(
TypedProperties.java:304
)
at org.hornetq.core.message.impl.MessageImpl.getLongProperty(
MessageImpl.java:756
)
at org.hornetq.core.client.impl.ClientLargeMessageImpl.getBodySize(
ClientLargeMessageImpl.java:109
)
at org.hornetq.jms.client.HornetQBytesMessage.doBeforeReceive(
HornetQBytesMessage.java:409
)
at org.hornetq.ra.inflow.HornetQMessageHandler.onMessage(
HornetQMessageHandler.java:269
)
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(
ClientConsumerImpl.java:866
)
at org.hornetq.core.client.impl.ClientConsumerImpl.access$100(
ClientConsumerImpl.java:44
)
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(
ClientConsumerImpl.java:983
)
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(
OrderedExecutorFactory.java:100
)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(
ThreadPoolExecutor.java:886
)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:908
)
at java.lang.Thread.run(
Thread.java:662
)
-
7. Re: Hornetq with core bridge delivers null when message over 45960 bytes
lorrymoller Dec 22, 2011 11:21 AM (in response to lorrymoller)The exception when sending ByteMessages above was with the 2.2.5 version of hornetq ; we actually have 2.1.2 in the field, and that version does not throw the exception, so it looks like we may go with that.
So, basically the only change from the original position was to add the content-length field to the stomp message and to read ByteMessages in the consumer rather than TextMessages. Hopefully, the 2.2.5 hornetq exception thrown on the consumer side will have been fixed by the time we need to upgrade to a newer version of hornetq.
We now see an exception on the producer side if the message is over 500kb (complains about a max buffer size, may be able to fix with configuration), but our messages shouldn't get that big, so for now it is okay and gets us past our production problem.
-
8. Re: Hornetq with core bridge delivers null when message over 45960 bytes
nick.pomfret Apr 18, 2012 11:56 AM (in response to lorrymoller)I'm having the exact same issues with large text messages not getting sent across a JMS bridge correctly with version 2.2.5final.
It happens in a clustered environemnt. Messages sent to a consumer on the same broker as the sender are not affected. Messages that get sent across a bridge to a consumer on another broker are affected - when calling TextMessage.getText() it returns null (for messages over a certian size ~50k).
We then tried using ByteMessage instead of TextMessage and got this error ...
java.lang.NumberFormatException: null
INFO | jvm 1 | 2012/04/18 14:01:55 | at java.lang.Long.parseLong(Long.java:404)
INFO | jvm 1 | 2012/04/18 14:01:55 | at java.lang.Long.valueOf(Long.java:540)
INFO | jvm 1 | 2012/04/18 14:01:55 | at org.hornetq.utils.TypedProperties.getLongProperty(TypedProperties.java:304)
INFO | jvm 1 | 2012/04/18 14:01:55 | at org.hornetq.core.message.impl.MessageImpl.getLongProperty(MessageImpl.java:756)
INFO | jvm 1 | 2012/04/18 14:01:55 | at org.hornetq.core.client.impl.ClientLargeMessageImpl.getBodySize(ClientLargeMessageImpl.java:109)
INFO | jvm 1 | 2012/04/18 14:01:55 | at org.hornetq.jms.client.HornetQBytesMessage.doBeforeReceive(HornetQBytesMessage.java:409)
INFO | jvm 1 | 2012/04/18 14:01:55 | at org.hornetq.jms.client.HornetQMessageConsumer.getMessage(HornetQMessageConsumer.java:236)
INFO | jvm 1 | 2012/04/18 14:01:55 | at org.hornetq.jms.client.HornetQMessageConsumer.receive(HornetQMessageConsumer.java:133)
As an interim measure, we're compressed messages over a certain size.
-
9. Re: Hornetq with core bridge delivers null when message over 45960 bytes
clebert.suconic Apr 18, 2012 2:53 PM (in response to nick.pomfret)I guess it's worth to try the next release (about to be uploaded)
-
10. Re: Hornetq with core bridge delivers null when message over 45960 bytes
nick.pomfret Apr 19, 2012 5:11 AM (in response to clebert.suconic)If i submit a unit test showing the error would someone be able to fix?
-
11. Re: Hornetq with core bridge delivers null when message over 45960 bytes
ataylor Apr 19, 2012 5:29 AM (in response to nick.pomfret)yes, but if you could try the latest version first to see if it is already fixed please
-
12. Re: Hornetq with core bridge delivers null when message over 45960 bytes
nick.pomfret Apr 19, 2012 8:24 AM (in response to ataylor)Will do, where do I get it from though?
Here's the test:
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.JournalType;
import org.hornetq.integration.logging.Log4jLogDelegateFactory;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.JMSConfiguration;
import org.hornetq.jms.server.config.JMSQueueConfiguration;
import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static org.hornetq.api.core.client.HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD;
import static org.hornetq.api.core.client.HornetQClient.DEFAULT_RETRY_INTERVAL;
import static org.hornetq.core.remoting.impl.netty.TransportConstants.HOST_PROP_NAME;
import static org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME;
import static org.hornetq.core.remoting.impl.netty.TransportConstants.USE_NIO_PROP_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class ClusteredBrokerLargeMessageTest {
private static String queueName = "queue";
private EmbeddedJMS serverA;
private EmbeddedJMS serverB;
private Connection connectionOnBrokerA;
private Session sessionOnBrokerA;
private Connection connectionOnBrokerB;
private Session sessionOnBrokerB;
@Before
public void before() throws Exception {
int brokerAPort = 8888;
serverA = createServer("serverA", brokerAPort);
serverA.start();
int brokerBPort = 9999;
serverB = createServer("serverB", brokerBPort);
serverB.start();
HornetQConnectionFactory factoryForBorkerA = connectionFactory(brokerAPort);
connectionOnBrokerA = factoryForBorkerA.createConnection();
connectionOnBrokerA.start();
sessionOnBrokerA = connectionOnBrokerA.createSession(false, AUTO_ACKNOWLEDGE);
HornetQConnectionFactory factoryForBrokerB = connectionFactory(brokerBPort);
connectionOnBrokerB = factoryForBrokerB.createConnection();
connectionOnBrokerB.start();
sessionOnBrokerB = connectionOnBrokerB.createSession(false, AUTO_ACKNOWLEDGE);
Thread.sleep(10 * 1000);//wait for the cluster - don't know how to do this programatically
}
@After
public void after() throws Exception {
connectionOnBrokerA.close();
sessionOnBrokerA.close();
connectionOnBrokerB.close();
sessionOnBrokerB.close();
serverA.stop();
serverB.stop();
}
@Test
public void largeTextMessages() throws Exception {
MessageConsumer consumerOnBrokerB = sessionOnBrokerB.createConsumer(sessionOnBrokerB.createQueue(queueName));
int startSize = 1024;
for (int messageSizeInBytes = startSize; messageSizeInBytes < Integer.MAX_VALUE; messageSizeInBytes += 1024) {
System.out.println("messageSizeInBytes = " + messageSizeInBytes);
String text = someText(messageSizeInBytes);
sendTextMessageTo(text, sessionOnBrokerA);
TextMessage receive = (TextMessage) consumerOnBrokerB.receive(5 * 1000);
assertNotNull("Should have got a message with size " + messageSizeInBytes + "b", receive);
assertEquals(text, receive.getText());
}
}
@Test
public void largeByteMessages() throws Exception {
MessageConsumer consumerOnBrokerB = sessionOnBrokerB.createConsumer(sessionOnBrokerB.createQueue(queueName));
int startSize = 1024;
for (int messageSizeInBytes = startSize; messageSizeInBytes < Integer.MAX_VALUE; messageSizeInBytes += 1024) {
System.out.println("messageSizeInBytes = " + messageSizeInBytes);
String text = someText(messageSizeInBytes);
sendBytesMessageTo(text, sessionOnBrokerA);
BytesMessage receive = (BytesMessage) consumerOnBrokerB.receive(5 * 1000);
assertNotNull("Should have got a message with size " + messageSizeInBytes + "b", receive);
}
}
private static void sendBytesMessageTo(String messageBody, Session session) throws Exception {
MessageProducer queue = session.createProducer(session.createQueue(queueName));
BytesMessage message = session.createBytesMessage();
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
message.writeBytes(messageBody.getBytes());
queue.send(message);
}
private static void sendTextMessageTo(String messageBody, Session session) throws Exception {
MessageProducer queue = session.createProducer(session.createQueue(queueName));
TextMessage message = session.createTextMessage(messageBody);
message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
queue.send(message);
}
private String someText(int bytes) {
char[] chars = new char[bytes];
Arrays.fill(chars, 'a');
return new String(chars);
}
private HornetQConnectionFactory connectionFactory(int port) throws Exception {
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
connectionParams.put(TransportConstants.HOST_PROP_NAME, fqn());
connectionParams.put(TransportConstants.USE_NIO_PROP_NAME, false);
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
HornetQConnectionFactory factory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);
factory.setClientFailureCheckPeriod(DEFAULT_CLIENT_FAILURE_CHECK_PERIOD);
factory.setRetryInterval(DEFAULT_RETRY_INTERVAL);
factory.setRetryIntervalMultiplier(1.5);
factory.setMaxRetryInterval(10000);
factory.setReconnectAttempts(-1);
// factory.setCacheLargeMessagesClient(true);
// factory.setCompressLargeMessage(true);
// factory.setMinLargeMessageSize(1024 * 100);
factory.setMinLargeMessageSize(Integer.MAX_VALUE);
return factory;
}
private static String fqn() throws UnknownHostException {
return InetAddress.getLocalHost().getCanonicalHostName();
}
private EmbeddedJMS createServer(String name, final int port) throws Exception {
Configuration configuration = new ConfigurationImpl();
final String groupAddress = "224.0.0.1";
final int groupPort = 6660;
String clusterName = "clusterName";
String connectorName = "netty-connector";
String broadcastGroupName = "broadcast-group";
String discoveryGroupName = "discovery-group-name-" + name;
Map<String, Object> acceptorParams = new HashMap<String, Object>() {{
put(PORT_PROP_NAME, port);
put(HOST_PROP_NAME, fqn());
put(USE_NIO_PROP_NAME, false);
}};
TransportConfiguration acceptorConfig = new TransportConfiguration(NettyAcceptorFactory.class.getName(), acceptorParams);
TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName(), acceptorParams, connectorName);
ClusterConnectionConfiguration clusterConnectionConfig = new ClusterConnectionConfiguration(
clusterName,
"jms",
connectorName,
1000,
false,
false,
1,
1024,
discoveryGroupName);
List<String> connectorInfos = new ArrayList<String>();
connectorInfos.add(connectorName);
BroadcastGroupConfiguration broadcastGroupConfig = new BroadcastGroupConfiguration(
broadcastGroupName,
null,
-1,
groupAddress,
groupPort,
2000,
connectorInfos);
DiscoveryGroupConfiguration discoveryGroupConfig = new DiscoveryGroupConfiguration(
discoveryGroupName,
null,
groupAddress,
groupPort,
10 * 1000,
10 * 1000);
configuration.setJMXManagementEnabled(false);
configuration.setClustered(true);
configuration.getClusterConfigurations().add(clusterConnectionConfig);
configuration.getAcceptorConfigurations().add(acceptorConfig);
configuration.getConnectorConfigurations().put(connectorConfig.getName(), connectorConfig);
configuration.getDiscoveryGroupConfigurations().put(discoveryGroupConfig.getName(), discoveryGroupConfig);
configuration.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
configuration.setPersistenceEnabled(false);
configuration.setSecurityEnabled(false);
configuration.setThreadPoolMaxSize(ConfigurationImpl.DEFAULT_THREAD_POOL_MAX_SIZE * 2);
configuration.setLogDelegateFactoryClassName(Log4jLogDelegateFactory.class.getName());
configuration.setJournalBufferSize_NIO(501760 * 10);
int journalBufferSize_nio = configuration.getJournalBufferSize_NIO();
System.out.println("journalBufferSize_nio = " + journalBufferSize_nio);
File dir = new File("target/" + name + "/" + UUID.randomUUID().toString());
dir.mkdirs();
configuration.setBindingsDirectory(new File(dir, "bindings").getAbsolutePath());
configuration.setJournalDirectory(new File(dir, "journal").getAbsolutePath());
configuration.setLargeMessagesDirectory(new File(dir, "largemessages").getAbsolutePath());
configuration.setPagingDirectory(new File(dir, "paging").getAbsolutePath());
configuration.setSharedStore(false);
configuration.setPersistenceEnabled(true);
configuration.setJournalType(JournalType.NIO);
List<JMSQueueConfiguration> queueConfigurations = new ArrayList<JMSQueueConfiguration>();
queueConfigurations.add(new JMSQueueConfigurationImpl(queueName, null, false));
JMSConfiguration jmsConfig = new JMSConfigurationImpl(new ArrayList<ConnectionFactoryConfiguration>(), queueConfigurations, new ArrayList<TopicConfiguration>(), null);
EmbeddedJMS server = new EmbeddedJMS();
server.setConfiguration(configuration);
server.setJmsConfiguration(jmsConfig);
return server;
}
}
-
13. Re: Hornetq with core bridge delivers null when message over 45960 bytes
chris.j.prior May 21, 2012 1:04 PM (in response to nick.pomfret)I believe I have this issue on 2.1.14.Final. Are there any known fixes / work arounds for this?