7 Replies Latest reply on Oct 23, 2015 10:05 AM by mattrk

    Send/receive message to/from HornetQ core queue

    mattrk

      Hello,

       

      I am experiencing difficulties sending/receiving messages to/from a HornetQ core queue.

       

      I get the following error message:

      HornetQConnectionTimedOutException[errorType=CONNECTION_TIMEDOUT message=HQ119013: Timed out waiting to receive cluster topology. Group:null]

        at org.hornetq.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:950)

        at hornetq_core_api.CoreAPIExample.run(CoreAPIExample.java:38)

        at hornetq_core_api.CoreAPIExample.main(CoreAPIExample.java:22)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:497)

        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)


      Does anyone know why this is occurring? (I get the error with/without clustering details in my config file.)

       

      I've actually tried 4 different methods of sending/receiving messages now (qpid, honetq core api, mqlight, and a qpid python-based approach). All have been unsuccessful so far, so I am wondering if it is something in (or not in) my config file.

       

      Thank you & regards,

      Matthew

       

       

      Code:

       

      package hornetq_core_api;

       

      import org.hornetq.api.core.TransportConfiguration;

      import org.hornetq.api.core.client.*;

      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

      import org.hornetq.core.remoting.impl.netty.TransportConstants;

       

      import java.util.HashMap;

      import java.util.Map;

       

      public class CoreAPIExample {

       

          public CoreAPIExample(){

       

       

          }

       

       

          public static void main(String[] args){

              new CoreAPIExample().run();

          }

       

       

          public void run(){

       

       

              try {

                  //Create connection config

                  Map<String,Object> connectionParams = new HashMap<>();

                  connectionParams.put(TransportConstants.HOST_PROP_NAME, "localhost");

                  connectionParams.put(TransportConstants.PORT_PROP_NAME, "5672");

                  TransportConfiguration transport = new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);

       

       

                  //Create server locator

                  ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(transport);

       

       

                  //Create session factory

                  ClientSessionFactory factory = locator.createSessionFactory();

       

       

                  //Create client session

                  ClientSession clientSession = factory.createSession();

       

       

                  //Create client producer

                  ClientProducer clientProducer = clientSession.createProducer("exampleQueue1");

       

       

                  //Create client consumer

                  ClientConsumer consumer = clientSession.createConsumer("exampleQueue1");

       

       

                  //Create and "load" message

                  ClientMessage message = clientSession.createMessage(false);

                  message.putStringProperty("myprop", "This is an interesting message");

       

       

                  //Start session

                  clientSession.start();

       

       

                  //Send message

                  clientProducer.send(message);

       

       

                  //Retrieve message

                  ClientMessage recdMessage = consumer.receive(1000);

                  System.out.println("Received message: " + recdMessage.getStringProperty("myprop"));

       

       

                  //Close session

                  clientSession.close();

       

       

              } catch (Exception e) {

                  e.printStackTrace();

              }

          }

      }

       

       

      Config:

       

      <configuration xmlns="urn:hornetq"

                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       

       

         <paging-directory>${data.dir:../data}/paging</paging-directory>

        

         <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

        

         <journal-directory>${data.dir:../data}/journal</journal-directory>

        

         <journal-min-files>10</journal-min-files>

        

         <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

        

         <connectors>

            <connector name="netty">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

            </connector>

           

            <connector name="netty-throughput">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

               <param key="batch-delay" value="50"/>

            </connector>

        

         <connector name="netty-proton">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.batch.port:5672}"/>

            </connector>

         </connectors>

       

       

         <acceptors>

            <acceptor name="netty">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

            </acceptor>

           

            <acceptor name="netty-throughput">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

               <param key="batch-delay" value="50"/>

               <param key="direct-deliver" value="false"/>

            </acceptor>

        

         <acceptor name="proton-acceptor">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

               <param key="protocols" value="AMQP"/>

               <param key="host" value="localhost"/>

               <param key="port" value="5672"/>

             </acceptor>

         </acceptors>

        

         <queues>    

            <queue name="exampleQueue1"> 

                  <address>underlying</address> 

                  <durable>true</durable> 

            </queue>

            <queue name="exampleQueue2"> 

                  <address>underlying</address> 

                  <durable>true</durable> 

            </queue> 

         </queues>

        

         <!--<cluster-connections>

          <cluster-connection name="my-cluster">

        <address>jms</address>

        <connector-ref>netty-connector</connector-ref>

        <use-duplicate-detection>true</use-duplicate-detection>

        <forward-when-no-consumers>false</forward-when-no-consumers>

        <max-hops>1</max-hops>

        <discovery-group-ref discovery-group-name="my-discovery-group"/>

        </cluster-connection>

         </cluster-connections>

        

         <discovery-groups>

          <discovery-group name="my-discovery-group">

             <group-address>localhost</group-address>

             <group-port>9876</group-port>

          </discovery-group>

        </discovery-groups>-->

        

         <security-settings>

            <security-setting match="#">

               <permission type="createNonDurableQueue" roles="guest"/>

               <permission type="deleteNonDurableQueue" roles="guest"/>

               <permission type="consume" roles="guest"/>

               <permission type="send" roles="guest"/>

            </security-setting>

         </security-settings>

       

       

         <address-settings>

            <!--default for catch all-->

            <address-setting match="#">

               <dead-letter-address>jms.queue.DLQ</dead-letter-address>

               <expiry-address>jms.queue.ExpiryQueue</expiry-address>

               <redelivery-delay>0</redelivery-delay>

               <max-size-bytes>10485760</max-size-bytes>      

               <message-counter-history-day-limit>10</message-counter-history-day-limit>

               <address-full-policy>BLOCK</address-full-policy>

            </address-setting>

         </address-settings>

      </configuration>

        • 1. Re: Send/receive message to/from HornetQ core queue
          mattrk

          Update:

           

          I think I needed:

          <permission type="createDurableQueue" roles="guest"/>

          <permission type="deleteDurableQueue" roles="guest"/>

          in my permissions section.

           

          After turning logging up, I noticed the following message in the logs:

          11:01:03,880 DEBUG [org.hornetq.core.server] Couldn't find any bindings for address=exampleQueue1 on message=ServerMessage[messageID=2147488224,durable=false,userID=null,priority=0, bodySize=512,expiration=0, durable=false, address=exampleQueue1,properties=TypedProperties[{HORNETQ_PROTON_FORMAT=1, PROTON_MESSAGE_SIZE=71, HORNETQ_PROTON_MESSAGE_FORMAT=0, HORNETQ_PROTON_MESSAGE_TYPE=3, HORNETQ_PROTON_SUBJECT=subject}]]@1344913211

           

          Thanks & regards,

          Matthew

          • 2. Re: Send/receive message to/from HornetQ core queue
            jbertram

            Couple of things...

            1. Since you're using the core API you should simply connect to the core acceptor on port 5445.
            2. In HornetQ you send messages to an address and receive them from a queue. In your code you're attempting to send the message to a queue which won't work (hence the debug message about no bindings, "Couldn't find any bindings for address=exampleQueue1").  HornetQ thinks you're attempting to send the message to address named "exampleQueue1" which doesn't exist.
            1 of 1 people found this helpful
            • 3. Re: Send/receive message to/from HornetQ core queue
              mattrk

              Hello,

               

              Thank you very much for correcting my misconception about the difference between an queue name and an address! That was clearly causing a lot of problems.

               

              I have the example working now using the following config change:

              <queues>   

                  <queue name="exampleQueue1">

                      <address>exampleQueue1</address>

                      <durable>true</durable>

                  </queue>

                  <queue name="exampleQueue2">

                      <address>exampleQueue2</address>

                      <durable>true</durable>

                  </queue>

              </queues>

               

              I changed the port to 5445 in the code, made a few more small tweaks, and in worked. Thank you very much for your help!

               

              There is one issue remaining however -- as I understand it, I am not using AMQP and wanted to do so...?

               

              If I instert <param key="protocols" value="AMQP"/> in to the acceptor, I end up with the following error when I run the java code:

               

              HornetQConnectionTimedOutException[errorType=CONNECTION_TIMEDOUT message=HQ119013: Timed out waiting to receive cluster topology. Group:null]

                at org.hornetq.core.client.impl.ServerLocatorImpl.createSessionFactory(ServerLocatorImpl.java:950)

                at hornetq_core_api.CoreAPIExample.run(CoreAPIExample.java:51)

                at hornetq_core_api.CoreAPIExample.main(CoreAPIExample.java:35)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                at java.lang.reflect.Method.invoke(Method.java:497)

                at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

               

              I didn't expect it to work (it was just a test), but I am unsure what the correct course of action is from reading the manual...

               

              My overall goal is to send/receive from a HornetQ core queue using AMQP (without using JMS)... is this actually possible or do I need to re-think?

               

              Thank you & regards,

              Matthew

              • 4. Re: Send/receive message to/from HornetQ core queue
                mattrk

                Working examples using QPID to talk to HornetQ, based off the examples supplied with QPID (hopefully these will help someone):

                 

                ### SEND ###

                 

                /*

                *

                * Licensed to the Apache Software Foundation (ASF) under one

                * or more contributor license agreements.  See the NOTICE file

                * distributed with this work for additional information

                * regarding copyright ownership.  The ASF licenses this file

                * to you under the Apache License, Version 2.0 (the

                * "License"); you may not use this file except in compliance

                * with the License.  You may obtain a copy of the License at

                *

                *   http://www.apache.org/licenses/LICENSE-2.0

                *

                * Unless required by applicable law or agreed to in writing,

                * software distributed under the License is distributed on an

                * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

                * KIND, either express or implied.  See the License for the

                * specific language governing permissions and limitations

                * under the License.

                *

                */

                package qpid;

                 

                 

                import org.apache.qpid.proton.TimeoutException;

                import org.apache.qpid.proton.amqp.messaging.AmqpValue;

                import org.apache.qpid.proton.amqp.messaging.Section;

                import org.apache.qpid.proton.message.Message;

                import org.apache.qpid.proton.messenger.Messenger;

                import java.util.logging.Level;

                import java.util.logging.Logger;

                 

                 

                /**

                * Example/test of the java Messenger/Message API.

                * Based closely qpid src/proton/examples/messenger/py/send.py

                * @author mberkowitz@sf.org

                * @since 8/4/2013

                */

                public class SendOld {

                 

                 

                    private static Logger tracer = Logger.getLogger("proton.example");

                    private String address = "amqp://guest:guest@localhost:5672/exampleQueue1";

                    private String subject = "subject";

                    private String[] bodies = new String[]{"Hello World!"};

                 

                 

                    private SendOld() {

                 

                 

                    }

                 

                 

                    public static void main(String args[]) {

                        SendOld o = new SendOld();

                        o.run();

                    }

                 

                 

                    private void run() {

                        try {

                            System.out.println("Create messenger 'guest'");

                            Messenger mng = org.apache.qpid.proton.messenger.impl.MessengerImpl.Factory.create("guest");

                 

                 

                            System.out.println("Set password");

                            mng.setPassword("guest");

                 

                 

                            System.out.println("Start messenger");

                            mng.start();

                 

                 

                            System.out.println("Create message");

                            Message msg = org.apache.qpid.proton.message.impl.MessageImpl.Factory.create();

                 

                 

                            System.out.println("Set message address");

                            msg.setAddress(address);

                 

                 

                            System.out.println("Set subject");

                            if (subject != null){

                                msg.setSubject(subject);

                            }

                 

                 

                            System.out.println("Set message bodies");

                            for (String body : bodies) {

                                Section s = new AmqpValue(body);

                                msg.setBody(s);

                                mng.put(msg);

                            }

                 

                 

                            System.out.println("Send message");

                            try {

                                mng.send();

                            } catch (TimeoutException e){

                                e.printStackTrace();

                            }

                 

                 

                            System.out.println("Stop messenger");

                            mng.stop();

                 

                 

                        } catch (Exception e) {

                            tracer.log(Level.SEVERE, "proton error", e);

                        }

                    }

                }

                 

                ### RECIEVE ###

                 

                /*

                *

                * Licensed to the Apache Software Foundation (ASF) under one

                * or more contributor license agreements.  See the NOTICE file

                * distributed with this work for additional information

                * regarding copyright ownership.  The ASF licenses this file

                * to you under the Apache License, Version 2.0 (the

                * "License"); you may not use this file except in compliance

                * with the License.  You may obtain a copy of the License at

                *

                *   http://www.apache.org/licenses/LICENSE-2.0

                *

                * Unless required by applicable law or agreed to in writing,

                * software distributed under the License is distributed on an

                * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY

                * KIND, either express or implied.  See the License for the

                * specific language governing permissions and limitations

                * under the License.

                *

                */

                package qpid;

                 

                 

                import org.apache.qpid.proton.TimeoutException;

                import org.apache.qpid.proton.message.Message;

                import org.apache.qpid.proton.messenger.Messenger;

                import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;

                 

                 

                import java.util.ArrayList;

                import java.util.List;

                import java.util.logging.Level;

                import java.util.logging.Logger;

                 

                 

                /**

                * Example/test of the java Messenger/Message API.

                * Based closely qpid src/proton/examples/messenger/py/recv.py

                * @author mberkowitz@sf.org

                * @since 8/4/2013

                */

                public class RecvOld {

                 

                 

                    /*

                    No error: just takes ages on mng.recv() then retrieves no messages

                     */

                 

                 

                    private static Logger tracer = Logger.getLogger("proton.example");

                    private boolean verbose = false;

                    private List<String> addrs = new ArrayList<>();

                 

                 

                    private RecvOld() {

                        addrs.add("amqp://guest:guest@localhost:5672/exampleQueue1");

                    }

                 

                 

                    private static String safe(Object o) {

                        return String.valueOf(o);

                    }

                 

                 

                    private void print(int i, Message msg) {

                        StringBuilder b = new StringBuilder("message: ");

                        b.append(i).append("\n");

                        b.append("Address: ").append(msg.getAddress()).append("\n");

                        b.append("Subject: ").append(msg.getSubject()).append("\n");

                        if (verbose) {

                            b.append("Props:     ").append(msg.getProperties()).append("\n");

                            b.append("App Props: ").append(msg.getApplicationProperties()).append("\n");

                            b.append("Msg Anno:  ").append(msg.getMessageAnnotations()).append("\n");

                            b.append("Del Anno:  ").append(msg.getDeliveryAnnotations()).append("\n");

                        } else {

                            ApplicationProperties p = msg.getApplicationProperties();

                            String s = (p == null) ? "null" : safe(p.getValue());

                            b.append("Headers: ").append(s).append("\n");

                        }

                        b.append(msg.getBody()).append("\n");

                        b.append("END").append("\n");

                        System.out.println(b.toString());

                    }

                 

                 

                    private void run() {

                        try {

                            System.out.println("Create messenger");

                            Messenger mng = org.apache.qpid.proton.messenger.impl.MessengerImpl.Factory.create("guest");

                 

                 

                            mng.setPassword("guest");

                 

                 

                            System.out.println("Start messenger");

                            mng.start();

                 

                 

                            System.out.println("Subscribe to each address");

                            for (String a : addrs) {

                                mng.subscribe(a);

                            }

                            int ct = 0;

                 

                 

                            System.out.println("mng.recv()");

                            try{

                                mng.recv(1);

                            } catch (TimeoutException e) {

                                e.printStackTrace();

                            }

                 

                 

                            System.out.println("Incomming: " + mng.incoming());

                            while (mng.incoming() > 0) {

                                    System.out.println("Getting message");

                                    Message msg = mng.get();

                                    ++ct;

                                    print(ct, msg);

                            }

                 

                 

                            System.out.println("Stop messenger");

                            mng.stop();

                        } catch (Exception e) {

                            tracer.log(Level.SEVERE, "proton error", e);

                        }

                    }

                 

                 

                    public static void main(String args[]) {

                        RecvOld o = new RecvOld();

                        o.run();

                    }

                }

                • 5. Re: Send/receive message to/from HornetQ core queue
                  jbertram

                  My overall goal is to send/receive from a HornetQ core queue using AMQP (without using JMS)... is this actually possible or do I need to re-think?

                  Yes, this is definitely possible.  However, you have to use a client that speaks AMQP if you want to communicate using AMQP.  A HornetQ core client doesn't speak AMQP, it speaks the HornetQ core protocol.

                  • 6. Re: Send/receive message to/from HornetQ core queue
                    jbertram

                    One more thing...I would recommend you look at the Apache ActiveMQ Artemis project.  The HornetQ code-base was donated to Apache ActiveMQ about a year ago and has developed into the Apache ActiveMQ Artemis broker.  No further development will be happening on HornetQ.

                    • 7. Re: Send/receive message to/from HornetQ core queue
                      mattrk

                      OK, thanks again for all the help. Switching to Artemis is definitely a future possibility.