2 Replies Latest reply on May 9, 2012 7:54 AM by adryen31200

    Queue ClientConsumer management

    adryen31200

      Hello all I have a good question ! I'm trying to set up a publish/subscription with core api but I have a question, if I subscription like this:

       

      clientConsumer = clientSession.createConsumer(queueName, filter);

       

      What's happend is the queue is already create ? And If isn't already created ? It's automatically create the linked queue ? without calling myself:

       

      clientSession.createQueue(address, queueName);

       

       

      And for unsubscription aspect ? I have to manage the queue myself ? If no client consumer are no longer bound to a queue, I need to remove it myselft

      or it's automatically remove it ? And during unsubscription I need to only close client session ?

       

      Can you clear my ideas plz ? I have the following methods, what do you think ?

       

       

       

       

       

                private final ClientSessionFactory clientSessionFactory;

       

       

                // Map where key is queueName and its bounded consumer map

                private final Map<String, Map<String, ClientConsumer>> queueConsumerMap;

       

       

                /**

                 * Allow to subscribe consumer to provided address

                 * This function create queue if isn't already

                 * create and bound to it a <ClientConsumer>.

                 * It's return the consumer subscription reference

                 * as a <String>.

                 * @param address

                 * @param filter

                 */

      public String subscribe(final String address, final String filter) throws FrontlineException {

       

       

                          LOGGER.info("Subscription request called for [address|filter]: [" + address + "|" + filter + "]");

                          ClientSession clientSession = null;

                          ClientConsumer clientConsumer = null;

                          final String queueName = address + "#Queue";

       

       

                          try {

                                    clientSession = this.clientSessionFactory.createSession();

                                    LOGGER.debug("Client session correclty created");

                          } catch (final HornetQException e) {

                                    LOGGER.error("Client session creation error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);

                          }

       

       

                          // Queue not already bounded

                          if (!this.queueConsumerMap.containsKey(queueName)) {

                                    try {

                                              LOGGER.debug(queueName + " queue not already created");

                                              clientSession.createQueue(address, queueName);

                                              this.queueConsumerMap.put(queueName, new HashMap<String, ClientConsumer>());

                                              LOGGER.info("Queue correctly create for [address|queueName]: [" + address + "|" + queueName + "]");

                                    } catch (final HornetQException e) {

                                              LOGGER.error("Queue creation error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);

                                    }

                          }

       

       

                          try {

                                    // Create a ClientConsumer to consume messages matching the filter from the queue with the given name.

                                    clientConsumer = clientSession.createConsumer(queueName, filter);

                                    LOGGER.info("Client consumer correctly create for [queueName|filter]: [" + queueName + "|" + filter + "]");

                          } catch (final HornetQException e) {

                                    LOGGER.error("Client consumer creation error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);

                          }

       

       

                          try {

                                    clientSession.start();

                                    LOGGER.debug("Client session correctly started");

                          } catch (final HornetQException e) {

                                    LOGGER.error("Client session starting error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);            }

       

       

                          try {

                                    clientConsumer.setMessageHandler(new MessageListener(clientSession));

                                    LOGGER.debug("Message listener correctly set to client consumer");

                          } catch (final HornetQException e) {

                                    LOGGER.error("Message handler binding error occurs during subscription for [address|filter]: [" + address + "|" + filter + "]", e);

                          }

                          final String consumerUUID = UUID.randomUUID().toString();

                          final String consumerSubscriptionRef = queueName + ":" + consumerUUID;

                          LOGGER.debug("Consumer subscription reference correctly generated: " + consumerSubscriptionRef);

                          this.queueConsumerMap.get(queueName).put(consumerUUID, clientConsumer);

                          LOGGER.info(consumerUUID + " client consumer correctly bound");

                          return consumerSubscriptionRef;

                }

       

       

                /**

                 * Allow to unsubscribe consumer by provided this

                 * consumer subscription reference. This function

                 * remove the bounded <ClientConsumer> from the queue

                 * and finally remove the queue is no longer

                 * <ClientConsumer> are bound.

                 * @param consumerSubscriptionRef

                 */

                public void unSubscribe(final String consumerSubscriptionRef) {

       

       

                          ClientConsumer clientConsumer = null;

       

       

                          LOGGER.info("Unsubscription request called for consumer subscription reference: " + consumerSubscriptionRef);

                          // Index 0 is queueName and index 1 is consumer UUID

                          final String[] subscriptionParts = consumerSubscriptionRef.split(":");

       

       

                          // Queue exist to provided address

                          final Map<String, ClientConsumer> clientConsumerMap = this.queueConsumerMap.get(subscriptionParts[0]);

                          LOGGER.debug("Queue found");

                          if (clientConsumerMap != null) {

                                    clientConsumer = clientConsumerMap.get(subscriptionParts[1]);

       

       

                                    /*

                                     * If client consumer found close its session

                                     * and remove it from client consumer map

                                     */

                                    if (clientConsumer != null) {

                                              LOGGER.debug("Client consumer found");

                                              MessageListener messageListener = null;

                                              try {

                                                        messageListener = (MessageListener) clientConsumer.getMessageHandler();

                                              } catch (final HornetQException e) {

                                                        LOGGER.error("Message handler getting error occurs during unsubscription for consumer subscription reference: " + consumerSubscriptionRef, e);

                                              }

                                              if (messageListener != null) {

       

       

                                                        LOGGER.debug("Message listener found");

                                                        final ClientSession clientSession = messageListener.getClientSession();

                                                        if (clientSession != null) {

                                                                  LOGGER.debug("Client session recovered");

                                                                  try {

                                                                            clientSession.close();

                                                                            LOGGER.debug("Client session correclty closed");

                                                                  } catch (final HornetQException e) {

                                                                            LOGGER.error("Client session closing error occurs during unsubscription for consumer subscription reference: " + consumerSubscriptionRef, e);

                                                                  }

                                                        } else {

                                                                  LOGGER.debug("No client session recovered in the message listener of client consumer");

                                                        }

                                              }

                                              clientConsumerMap.remove(subscriptionParts[1]);

       

       

                                              LOGGER.debug("Consumer client " + consumerSubscriptionRef + " found was removed");

       

       

                                              LOGGER.info("Unsubscription request correctly processed for consumer subscription reference: " + consumerSubscriptionRef);

                                    } else {

                                              LOGGER.debug("No client consumer found during unsubcription for consumer subscription reference: " + consumerSubscriptionRef);

                                    }

       

       

                                    // If no longer client consumer bounded to queue, delete it

                                    if (clientConsumerMap.isEmpty()) {

                                              this.queueConsumerMap.remove(subscriptionParts[0]);

                                              LOGGER.debug("No longer consumer client found, therefore " + subscriptionParts[0] + " queue was removed");

                                    }

                          } else {

                                    LOGGER.debug("No queue found during unsubcription for consumer subscription reference: " + consumerSubscriptionRef);

                          }

                }

       

      Do you have a publish/subscribe example for API core ? Because I haven't found on

       

      Thank you

        • 1. Re: Queue ClientConsumer management
          ataylor

          What's happend is the queue is already create ? And If isn't already created ? It's automatically create the linked queue ? without calling myself:

           

          clientSession.createQueue(address, queueName);

           

           

          And for unsubscription aspect ? I have to manage the queue myself ? If no client consumer are no longer bound to a queue, I need to remove it myselft

          or it's automatically remove it ? And during unsubscription I need to only close client session ?

          I'm not sure what you are saying here but if you create a queue then you also need to delete it unless its temporary where it will be delelted on session close

           

          Do you have a publish/subscribe example for API core ? Because I haven't found on

          With the core api there are no topics just address and queues so a subscription would be a queue on an address with a single consumer.

          • 2. Re: Queue ClientConsumer management
            adryen31200

            Thank Andy for your response.

             

            Ok isn't a address --> queue 1 --> consumer 1, consumer 2, consumer n

                                            queue 2 --> consumer 3, consumer 2, consumer m

                                            queue n --> consumer 1, consumer 4, consumer 3

            for publish/subcribe pattern

            but address --> queue1 --> consumer1

                              --> queue2 ---> consumer2

                              --> queue n --> consumer n

             

            1 subscription isn't represented by one consumer on one queue, but one queue on one address ?

             

             

            for your response to "I'm not sure what you are saying here but if you create a queue then you also need to delete it unless its temporary where it will be delelted on session close"

             

            Can I only do clientConsumer = clientSession.createConsumer(queueName, filter);

            without calling before: clientSession.createQueue(address, queueName);

             

            createConsumer() also create Queue if queueName isn't created ?

             

             

             

            Other problem here I have one ClientSession for one Consumer but If I have 100000 consumers ... it means I will have 100000 openned ClientSession ... do you have a better idea for this problem ?