Pub/sub setup results in leak of ServerMessageImpl instances
gkopff Sep 25, 2014 4:30 AMI am trying to use HornetQ to perform publish/subscribe style messaging. I am using HornetQ 2.4.1.Final. I use the native HornetQ APIs, not JMS.
I create an embedded HornetQ server with the following configuration:
final Configuration configuration = new ConfigurationImpl();
configuration.setPersistenceEnabled(false);
configuration.setJournalDirectory(System.getProperty("java.io.tmpdir"));
configuration.setCreateJournalDir(false);
configuration.setSecurityEnabled(false);
configuration.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));
configuration.setClusterUser("unused");
configuration.setClusterPassword("disabled");
final HornetQServer server = HornetQServers.newHornetQServer(configuration);
server.start();
I create a publisher like so:
final ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(
new TransportConfiguration(NettyConnectorFactory.class.getName()));
locator.setConfirmationWindowSize(5 * 1024 * 1024);
locator.setReconnectAttempts(RETRY_FOREVER);
final ClientSessionFactory factory = locator.createSessionFactory();
this.session = factory.createSession();
this.producer = this.session.createProducer("the_address");
I compose and send messages like this:
final ClientMessage msg = this.session.createMessage(false);
msg.getBodyBuffer().writeString("the body");
msg.putLongProperty("an_id", id);
this.producer.send(msg);
On the other end, I setup a subscriber like this:
final ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(
new TransportConfiguration(NettyConnectorFactory.class.getName()));
locator.setConfirmationWindowSize(5 * 1024 * 1024);
locator.setReconnectAttempts(RETRY_FOREVER);
final ClientSessionFactory factory = locator.createSessionFactory();
this.session = factory.createSession();
final String queueName = UUID.randomUUID().toString();
this.session.createQueue("the_address", queueName);
this.consumer = this.session.createConsumer(queueName);
this.consumer.setMessageHandler(this::receive);
However, when I run this code and inject a lot of messages though it, I find an ever increasing reachable reference count for org.hornetq.core.server.impl.ServerMessageImpl, eventually resulting in an out of memory error. The same applies to instances of .org.hornetq.core.server.impl.MessageReferenceImpl.
What have I misconfigured? Is my approach incorrect?