1 2 Previous Next 16 Replies Latest reply on Oct 6, 2009 1:05 PM by clebert.suconic

    Embedded HornetQ server and client in peer to peer network

      This is my first post to this forum, so I would appreciate comments about the post style, etc., so I can be easy to understand.

      I am trying to implement a network of peer nodes, each of which can serve information and request information from each other, including very large file streams.

      I intend to use secure, persistent messaging, with transactions, all implemented in core classes, but I am just now trying
      to implement basic communications. I am {HornetQ, JBoss, JMS}-naive. HornetQ appears to be the perfect platform for me but...

      My question is this: is there more complete Javadoc for the whole project? Classes do not have any commentary about the class itself, and most methods have absolutely no doc at all.

      I am trying to configure and instantiate a HornetQ Server in each node (using what I can glean from the embedded example code and the users manual), and then set up a client session which can connect to any of the other node server instances using netty transport.

      I have a small set of basic messages (POJOs) for registering nodes with each other, etc. I believe that I need to implement a MessageConsumer
      on both server and client sides to receive and decode my messages and the responses.

      I am getting exceptions when I try to start up a session for either the server or the client.:

      Here is the server configuration and startup, which appears to be OK, but I am not sure.
      ====================================================
      TransportConfiguration nettyAcceptorConfig = null;
      TransportConfiguration nettyConnectorConfig = null;
      try {
      nettyAcceptorConfig = new TransportConfiguration(
      NettyAcceptorFactory.class.getName());
      Map<String, Object> connectionParams = nettyAcceptorConfig.getParams();
      connectionParams.put(TransportConstants.PORT_PROP_NAME, this.serverPort);
      /**
      * TODO write the code that figures out what the local host name is
      * and use it instead of localhost so remote nodes can connect!
      */
      //this.realHostName = "localhost";
      connectionParams.put(TransportConstants.HOST_PROP_NAME, this.realHostName);

      ConfigurationImpl serverConfig = new ConfigurationImpl();
      serverConfig.setPersistenceEnabled(false);
      serverConfig.setSecurityEnabled(false);
      serverConfig.setClustered(false);

      serverConfig.setManagementClusterPassword("clusterManagement");
      serverConfig.getAcceptorConfigurations().add(nettyAcceptorConfig);

      // Start up the server for this node.
      this.server = HornetQ.newHornetQServer(serverConfig);
      this.server.start();

      }
      catch (Exception e)
      {
      System.out.println("Exception from starting server on node " + this.nodeName);

      }

      Next, I try to set up the client session before I send out a registration packet.
      try {
      final SimpleString clientQueueName = new SimpleString("incoming");
      Queue incomingQueue = server.createQueue(clientQueueName, clientQueueName, null, false, false);

      This throws an exception immediately -
      logger says: INFO: HornetQ Server version 2.0.0.BETA5 (hornet-baby, 108) started
      HornetQException[errorCode=101 message=null]
      at org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage.decodeBody(HornetQExceptionMessage.java:79)
      at org.hornetq.core.remoting.impl.wireformat.PacketImpl.decode(PacketImpl.java:214)
      at org.hornetq.core.remoting.impl.PacketDecoder.decode(PacketDecoder.java:435)
      at org.hornetq.core.remoting.impl.RemotingConnectionImpl.bufferReceived(RemotingConnectionImpl.java:343)
      ...
      ...

      /**
      * Now create the structures needed for the client side so we can
      * make requests to other nodes by address.
      */
      nettyConnectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName());
      Map<String, Object> connectionParams = nettyConnectorConfig.getParams();
      connectionParams.put(TransportConstants.PORT_PROP_NAME, Node.MASTER_PORT);
      connectionParams.put(TransportConstants.HOST_PROP_NAME, this.realHostName);
      this.clientSessionFactory = new ClientSessionFactoryImpl(nettyConnectorConfig);
      this.coreSession = this.clientSessionFactory.createSession(false, false, false);

      coreSession.createQueue(incomingQueueName, incomingQueueName, false);

      This throws the same exception - clearly there is a message being sent underneath the covers, but the decoders cannot figure it out. I haven't sent out my own custom message yet.

      coreSession.close();

      } catch (Exception e1) {
      System.out.println ("Exception caught in " + this.nodeName + " while configuring the coreSession!!!");
      e1.printStackTrace();
      }

      What steps I am missing, or obvious errors am I committing?

      Frank Sherwood

        • 1. Re: Embedded HornetQ server and client in peer to peer netwo
          clebert.suconic

           

          My question is this: is there more complete Javadoc for the whole project? Classes do not have any commentary about the class itself, and most methods have absolutely no doc at all.


          Well.. javadoc is something we could improve. However we have done an excelent job on the examples and docs, and we actually concentrated on those. We will add a little bit more on the javadocs.


          You example seems fine to me. 101 (as you can see on HornetQException) means the Queue already exists.

          So, you are creating a queue that was already created before.


          You could ignore that exception where you are calling session.createQueue();

          • 2. Re: Embedded HornetQ server and client in peer to peer netwo

            Clebert,

            thanks for the help on the error message. I did find the error code in the doc just as you said. I am no longer creating duplicate queues.

            I need the server side of a node to handle several packets for registering peer nodes, etc. Is the best way to do this to implement an interceptor on the server queue where the clients post and use it to decode my packets from the contents of the SessionSendMessage packets intercepted?

            Thanks,

            Frank Sherwood

            • 3. Re: Embedded HornetQ server and client in peer to peer netwo
              clebert.suconic

               

              I need the server side of a node to handle several packets for registering peer nodes, etc. Is the best way to do this to implement an interceptor on the server queue where the clients post and use it to decode my packets from the contents of the SessionSendMessage packets intercepted?


              I'm sorry, I didn't understand what you're trying to do here.

              We already have clustering doing something like that. Have you seen the clustering documentation? Perhaps that's something more related to your use case?

              Perhaps a concrete example would help me understand what you're trying to accomplish.

              • 4. Re: Embedded HornetQ server and client in peer to peer netwo

                Clebert,

                I am building a network of nodes which must keep in contact, and will send data streams back and forth from time to time. The streams can be very large at times, small at other times. HornetQ's reliable messaging, handling of very large streams and transaction control are ideal for my purposes.

                I am trying to build a single Java class that handles this streaming. If I understand your messaging model, I need to embed a HornetQ server instance in each node for receiving stream requests to accept or retrieve streams, initiated by a peer node, and each node needs a client side for initiating them.

                I have coded enough of the class to send messages from the client side of one node, and see that they are received by the server side of the remote node. I verified this by implementing an interceptor on the remote node server side.

                My protocol requires that requests be acknowledged above the transport layer. I have not yet figured out what I need to do in order to
                1. receive the packets as a consumer of the server queue to which they are sent by the client side of the requesting node, and
                2. return the correct response packets to the client side of the node that sent them.

                To test, I create two nodes named "node1" and "node2"
                My code performs the following steps:
                For the server side:
                1. configure the server for netty socket transport (on a different fixed port for each node).
                2. start the server
                3. server.createQueue("server", "clientRequests", null, false, true);

                For the client side:
                1. create a new netty transport configuration using the port and host of node1 in node2
                2. get a client session factory using the transport configuration
                3. create a "coreSession" as in the embedded example code
                4. perform coreSession.createQueue("server", "node2", false);

                only in node2:
                5. create clientSession = clientSessionFactory.createSession();
                6. clientProducer = clientSession.createProducer("node1");
                7. ClientMessage message = clientSession.createClientMessage(false);
                8. encode my packet into a byte[] and put it in the message body.
                9. set a property on the message to tell what type packet it contains
                10. clientProducer.send(message);
                11. clientSession.start();
                12. clientConsumer = clientSession.createConsumer("node1");
                13. ClientMessage messageReceived = clientConsumer.receive(10000);

                On the server side in node1:
                1. the message comes through
                2. I intercept it on the queue I expected it to arrive on
                3. I decode my packet successfully
                4. I create ClientProducer prod = coreSession.createProducer("server");
                5. Message msg = coreSession.createClientMessage(false);
                6. I encode my response and put it in the body of the message
                7. I set a property on msg
                8. send it using the producer above

                I receive a null message in node2 in step 13 above.

                I think I am not correctly addressing the queue, since the producer does not know about the queue name, only the address.

                This seems unlikely to be the correct way to do this. I could not piece together a better way to get the messages on the server node. Can you tell me the correct way to do this?

                Thanks in advance for your help.

                Frank Sherwood

                • 5. Re: Embedded HornetQ server and client in peer to peer netwo
                  clebert.suconic

                  I don't understand your architecture...


                  If you just need to receive messages on the client, why you just don't create a consumer on the client side using regular JMS or core API?

                  Consumer cons = session.createConsumer()
                  session.start();
                  cons.receive(timetou); // or stablish a message listener here?


                  if you need a peer of server, you probably need them configured as clusters or bridges.

                  IMO you should simplify your architecture. or maybe I' m missing something here?

                  • 6. Re: Embedded HornetQ server and client in peer to peer netwo

                    Clebert,

                    I have taken your suggestion and recast my architecture as follows:

                    I instantiate a HornetQ server called RelayServer and start it.
                    I added an interceptor to be sure the packets were getting there,
                    copied from the interceptor example code.


                    its intercept method looks like this:
                    ...

                    if (packet instanceof SessionSendMessage) {
                     SessionSendMessage realPacket = (SessionSendMessage) packet;
                     Message msg = realPacket.getServerMessage();
                     System.out.println("Packet destination = " + msg.getDestination().toString());
                     System.out.print("Packet source = \"" + msg.getProperty("src").toString());
                     System.out.println("\": target = \"" + msg.getProperty("target").toString() + "\"");
                    }
                    return true;
                    


                    I create a client node named masterNode:
                    - ------ - ------ ---- ----- -----------
                    clientSessionFactory = new ClientSessionFactoryImpl(nettyConnectorConfig);
                    clientSession = clientSessionFactory.createSession(false, true, true);
                    clientSession.createQueue("nodes", "masterNode", "target = 'masterNode'", true);
                    clientConsumer = clientSession.createConsumer("masterNode", "target = 'masterNode'");
                    clientSession.start();
                    

                    I start a while loop in a processor thread:
                    while (true) {
                     ClientMessage messageReceived = this.clientConsumer.receive(1000);
                     print if message is not null;
                    }
                    

                    I create a client named node1:
                    - ------ - ------ ----- ------
                    clientSessionFactory = new ClientSessionFactoryImpl(nettyConnectorConfig);
                    clientSession = clientSessionFactory.createSession(false, true, true);
                    clientSession.createQueue("nodes", "node1", target = 'node1'", true);
                    clientConsumer = clientSession.createConsumer("node1", "target = 'node1'");
                    
                    message = this.clientSession.createClientMessage(false);
                    message.putStringProperty(new SimpleString("target"), new SimpleString("masterNode"));
                    ... set a couple of other properties ...
                    ... add the body content ...
                    
                    this.clientSession.start();
                    this.clientProducer = this.clientSession.createProducer("nodes");
                    this.clientProducer.send("nodes", message);
                    

                    The client message gets to the HornetQ server with the properties specified above.

                    Server output:
                    Server got SessionSendMessage from /10.192.168.217:4689
                    Packet destination = nodes
                    Packet source = "node1": target = "masterNode"

                    The target node "masterNode" never receives the message.


                    I must be doing something wrong with the addressing, but I have tried
                    many permutations, reading and rereading the User's Manual.

                    Can you help? Do you have an example like this using the core API, where there is more than one client trying to talk to each other via a remote HornetQ server?

                    Frank Sherwood


                    • 7. Re: Embedded HornetQ server and client in peer to peer netwo
                      clebert.suconic

                      On this case you have a single server, where both clients are supposed to talk to the server, and exchange messages through a single queue, right?

                      Are you trying to have a client communication to another client directly?

                      Maybe I'm starting to understand what you were trying to accomplish before.

                      Do you need each client to act as a server? The examples before were a bit confusing to me and I couldn't figure out what you were trying to do.

                      • 8. Re: Embedded HornetQ server and client in peer to peer netwo

                        Clebert,
                        that is right, two clients (eventually many) trying to exchange messages through the server.

                        Previously, I was trying to have two clients communicate directly with each other via embedded servers, but decided that it would be better through a central server.

                        I am having trouble mapping the JMS examples to the core API with respect to message and queue addressing.

                        Frank

                        • 9. Re: Embedded HornetQ server and client in peer to peer netwo
                          clebert.suconic

                          There is some explanation about that on chapter 9:

                          http://hornetq.sourceforge.net/docs/hornetq-2.0.0.BETA5/user-manual/en/html_single/index.html#jms-core-mapping


                          Also there are a few core examples on the examples list.


                          Please, Let me know if you still have questions.

                          • 10. Re: Embedded HornetQ server and client in peer to peer netwo

                            Clebert,

                            I have read the section you suggested. I do not want to incur the extra overhead of the JMS "facade" shell mentioned in the UserManual section 5.1, so I am trying to keep purely to Core API.

                            Am I correct to create queues on the address "nodes", as follows?

                            clientSession.createQueue("nodes", "masterNode", filter, true);
                            clientSession.createQueue("nodes", "node1", filter, true);
                            ...
                            clientProducer = clientSession.createProducer("nodes.node1");
                            ...
                            clientProducer.send("nodes", message);
                            ...
                            
                            clientConsumer = clientSession.createConsumer("masterNode", filter);
                            message = clientConsumer.receive(5000);
                            
                            


                            This is the essence of what I am doing, and I never get the message to the consumer, even though it arrives on the server with the destination address "nodes".

                            Do I have to prepend "jms.queue" to my "nodes" address?

                            Frank


                            • 11. Re: Embedded HornetQ server and client in peer to peer netwo
                              clebert.suconic

                              On the core, there is the concept of Address, and the concept of Queue.

                              The address is the key where messages are produced. The queue is the key where messages are consumed.

                              A single address could have multiple queues. (JMS will use that as to create topics, based on multiple queues to a single address).


                              In your case, you created the address "node", and you are binding two queues to "node".

                              In your case, you don't have any address called "nodes.node1", so the messages are just being ignored. If you had createProducer("nodes"), you would receive the messages on both masterNode and node1 queues. (like a JMS Topic)


                              I suspect you actually wanted to do this:

                              clientSession.createQueue("masterNode", "masterNode", filter, true);
                              clientSession.createQueue("node1", "node1", filter, true);


                              And then:

                              clientProducer = clientSession.createProducer("node1");


                              clientConsumer = clientSession.createConsumer("node1", filter);
                              Message msg = clientConsumer.receive(1000);


                              Let me know how it goes.

                              • 12. Re: Embedded HornetQ server and client in peer to peer netwo
                                clebert.suconic

                                Just a FYI, I post edited my previous post due to a typo.

                                • 13. Re: Embedded HornetQ server and client in peer to peer netwo

                                  Clebert,

                                  the only way a message is getting delivered is if I remove the filters from the queues, created the way you suggested.
                                  
                                  In receiver client:
                                  clientSession.createQueue("masterNode", "masterNode", null/*this.filterExpr*/, true);
                                  clientConsumer = clientSession.createConsumer("masterNode", null/*this.filterExpr*/);
                                  
                                  In producer client
                                  clientSession.createQueue("node1", "node1", null/*this.filterExpr*/, true);
                                  clientConsumer = clientSession.createConsumer("node1", null/*this.filterExpr*/);
                                  ...
                                  clientProducer = clientSession.createProducer("node1");
                                  ...
                                  clientProducer.send("node1", message);
                                  

                                  when I post the message to the queue "node1", it is received by the producer client, and not by the target client.

                                  If I change the address to
                                  clientProducer.send("masterNode", message);
                                  


                                  nothing comes through.

                                  I am really stuck here.

                                  Frank


                                  • 14. Re: Embedded HornetQ server and client in peer to peer netwo
                                    clebert.suconic


                                    Before you were confusing the address with the queue. At least you understand the queue/address concept now.. right?

                                    You didn't mention filters before. So.. I' m not sure what you are doing there. You are probably confusing something very basic there.


                                    If you provide me an easy to run code showing where you're stuck (you could send me by email), I could take a look.

                                    Base your code in one of the examples.. so I could just place your code there and take a look.

                                    1 2 Previous Next