Send/receive message to/from HornetQ core queue
mattrk Oct 23, 2015 5:04 AMHello,
I am experiencing difficulties sending/receiving messages to/from a HornetQ core queue.
I get the following error message:
HornetQConnectionTimedOutException[errorType=CONNECTION_TIMEDOUT message=HQ119013: Timed out waiting to receive cluster topology. Group:null]
at org.hornetq.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:950)
at hornetq_core_api.CoreAPIExample.run(CoreAPIExample.java:38)
at hornetq_core_api.CoreAPIExample.main(CoreAPIExample.java:22)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Does anyone know why this is occurring? (I get the error with/without clustering details in my config file.)
I've actually tried 4 different methods of sending/receiving messages now (qpid, honetq core api, mqlight, and a qpid python-based approach). All have been unsuccessful so far, so I am wondering if it is something in (or not in) my config file.
Thank you & regards,
Matthew
Code:
package hornetq_core_api;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import java.util.HashMap;
import java.util.Map;
public class CoreAPIExample {
public CoreAPIExample(){
}
public static void main(String[] args){
new CoreAPIExample().run();
}
public void run(){
try {
//Create connection config
Map<String,Object> connectionParams = new HashMap<>();
connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost");
connectionParams.put(TransportConstants.PORT_PROP_NAME, "5672");
TransportConfiguration transport = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
//Create server locator
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(transport);
//Create session factory
ClientSessionFactory factory = locator.createSessionFactory();
//Create client session
ClientSession clientSession = factory.createSession();
//Create client producer
ClientProducer clientProducer = clientSession.createProducer("exampleQueue1");
//Create client consumer
ClientConsumer consumer = clientSession.createConsumer("exampleQueue1");
//Create and "load" message
ClientMessage message = clientSession.createMessage(false);
message.putStringProperty("myprop", "This is an interesting message");
//Start session
clientSession.start();
//Send message
clientProducer.send(message);
//Retrieve message
ClientMessage recdMessage = consumer.receive(1000);
System.out.println("Received message: " + recdMessage.getStringProperty("myprop"));
//Close session
clientSession.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Config:
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
<connector name="netty-proton">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5672}"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
<acceptor name="proton-acceptor">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="protocols" value="AMQP"/>
<param key="host" value="localhost"/>
<param key="port" value="5672"/>
</acceptor>
</acceptors>
<queues>
<queue name="exampleQueue1">
<address>underlying</address>
<durable>true</durable>
</queue>
<queue name="exampleQueue2">
<address>underlying</address>
<durable>true</durable>
</queue>
</queues>
<!--<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
<connector-ref>netty-connector</connector-ref>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
<discovery-group-ref discovery-group-name="my-discovery-group"/>
</cluster-connection>
</cluster-connections>
<discovery-groups>
<discovery-group name="my-discovery-group">
<group-address>localhost</group-address>
<group-port>9876</group-port>
</discovery-group>
</discovery-groups>-->
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>