6 Replies Latest reply on Mar 8, 2017 4:18 AM by francesca.herpertz

    Artemis Queue Purge Messages

    francesca.herpertz

      Hi people,

       

      I am currently facing some issues when it comes to deleting messages from an artemis message queue configured in wildfly 10.1.

       

      My application works the following way:

       

      I have a  war deployment with the main application running in it. From this application I am able to start batch processes in their own JVM. This newly spawned JVM communicates via the JMS Message Queue and a Topic on the wildfly with some other components which are registered as consumers on the queue. Those consumers run outside of the application server and are not necessarily on the same machine. When an error occurs during the communication/execution/consumption of these messages an error will be thrown and the JVM initially spawned by the main application terminates. The main application, as it administers the JVMs/Batch Processes running is then tasked to clean up the Message Queue and delete all remaining messages of erroneous Job. All messages put to the queue have an ID which identifies the batch job. It is configured as a header property. The following code works fine if no consumer is registered or had already started to consume some messages.

       

      This is the code with which I purge the queue:

       

      Connection tmpConnection = null;
         Session tmpSession = null;
         MessageConsumer consumer = null;
         QueueBrowser browser = null;
        try {

        tmpConnection = connectionFactory.createConnection(System.getProperty("jee.username", DEFAULT_USERNAME), DEFAULT_PASSWORD);
         tmpSession = tmpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = (Queue) initialContext.lookup(System.getProperty("destination", DEFAULT_DESTINATION));
         browser = tmpSession.createBrowser(queue);
         Enumeration<?> enum1 = browser.getEnumeration();
         tmpConnection.start();
        while (enum1.hasMoreElements()) {

        TextMessage msg = (TextMessage) enum1.nextElement();
        if (msg.getStringProperty(JOBID_PROPERTY).equals(jobId)) {

        consumer = tmpSession.createConsumer(queue, "JMSMessageID='" + msg.getJMSMessageID() + "'");
         // noWait does not work necessarily - message could not be delivered at that moment and message is null and this message could
        // remain forever on the queue due to bad timing.
         Message message = consumer.receive(1000);
        if (message != null) {

         // log.info("message not null");
         }

        consumer.close();
         }

       

        }

        } catch (Exception e) {

        e.printStackTrace();
         } finally {

         // tmpSession.commit();
         if (consumer != null) {

        consumer.close();
         }

        browser.close();
         tmpSession.close();
         tmpConnection.close();
         }

      }

       

       

      I know it is overhead to always open a consumer. I just tried to see if I can purge the queue this way completely. It is irrelevant how long it takes to purge the queue after the abort. at the moment.

       

       

      The queue is configured the following way:

       

       

              <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">

                  <server name="default" persistence-enabled="false">

                      <security-setting name="#">

                          <role name="guest" send="true" consume="true" create-non-durable-queue="true" delete-non-durable-queue="true"/>

                          <role name="ejb" send="true" consume="true" create-durable-queue="true" delete-durable-queue="true" create-non-durable-queue="true" delete-non-durable-queue="true" manage="true"/>

                      </security-setting>

                      <address-setting name="#" dead-letter-address="jms.queue.DLQ" expiry-address="jms.queue.ExpiryQueue" max-size-bytes="300000000" page-size-bytes="10000000" message-counter-history-day-limit="10"/>

                      <http-connector name="http-connector" socket-binding="http" endpoint="http-acceptor"/>

                      <http-connector name="http-connector-throughput" socket-binding="http" endpoint="http-acceptor-throughput">

                          <param name="direct-deliver" value="true"/>

                      </http-connector>

                      <in-vm-connector name="in-vm" server-id="0"/>

                      <http-acceptor name="http-acceptor" http-listener="default"/>

                      <http-acceptor name="http-acceptor-throughput" http-listener="default">

                          <param name="direct-deliver" value="true"/>

                      </http-acceptor>

                      <remote-acceptor name="tcp-acceptor" socket-binding="tcp"/>

                      <in-vm-acceptor name="in-vm" server-id="0"/>

                      <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>

                      <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>

                      <jms-queue name="myTrace" entries="java:jboss/exported/jms/myTrace" durable="false"/>

                      <jms-queue name="myDataPackage" entries="java:jboss/exported/jms/myDataPackage" durable="false"/>

                      <jms-topic name="myWorkflow" entries="java:jboss/exported/jms/myWorkflow"/>

                      <jms-topic name="myTopic" entries="java:jboss/exported/jms/myTopic"/>

                      <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm" client-failure-check-period="-1" connection-ttl="-1" min-large-message-size="600000000"/>

                      <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" client-failure-check-period="-1" connection-ttl="-1" min-large-message-size="600000000"/>

                      <pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" client-failure-check-period="-1" connection-ttl="-1" min-large-message-size="600000000" transaction="xa"/>

                  </server>

              </subsystem>

       

       

       

      Initially I thought it had something to do with the consumers having messages already consumed and with abort of the JVM those partially consumed messages are rolled back onto the queue. Could that be a valid reason for this?

       

      I hope you can help me out here as I have already read the artemis documentation and not found a clue. The above code works fine when there is no consumer on the queue already or if the message consumption has not yet started. (FYI there is a slight delay before the consumers are initialized because they need more context information which they get from somewhere else)

       

       

      Thanks already!

       

      Regards,

      Francesca

       

        • 1. Re: Artemis Queue Purge Messages
          jbertram

          Couple of questions...

          1. You explain that you're trying to purge the queue, but I don't see where you explain what actually happens.  So...what's actually going wrong?
          2. Do all messages on the queue have the job ID property?  If so, why are you consuming them one by one?  Why not simply attach a consumer to the queue and consume messages until there are none left or (even better) use the management operation "removeMessages" with an empty filter to delete all the messages in one shot?
          • 2. Re: Artemis Queue Purge Messages
            francesca.herpertz

            Hi,

             

            Thanks for the quick reply

             

            1. Some messages always stay on the queue. Even the delete queue button in the wildfly admin interface does not delete those, but they are still displayed in the console view.

            2. Everything has a job ID and I know it is bad to open the consumer every time. The job ID can differ per message so I cannot delete all messages on the queue but have to leave some of them there as there is a possibility that another process/jvm is still running

             

            Hope that clarifies it a bit.

            • 3. Re: Artemis Queue Purge Messages
              jbertram

              What is the message-count, consumer-count, and delivering-count of the queue before and after you attempt to delete the messages?

              • 4. Re: Artemis Queue Purge Messages
                francesca.herpertz

                That differs depending on the time I abort the running jvm or an error is raised.

                 

                But in general we put about 40 messages on the queue and have 8 consumers. I try to abort it when about 10 to 20 messages are already consumed and in general some messages remain after the delete. The number tends to be less than I have consumer I think.

                • 5. Re: Artemis Queue Purge Messages
                  jbertram

                  I think you're going to need to work up a test-case I can use to reproduce this.  Ideally this test-case would be automated and as simple as possible only requiring what is absolutely necessary to reproduce the problem.

                  • 6. Re: Artemis Queue Purge Messages
                    francesca.herpertz

                    I will try to provide you with a sufficient example which will show the behaviour.