6 Replies Latest reply on Mar 8, 2017 4:18 AM by Francesca Herpertz

    Artemis Queue Purge Messages

    Francesca Herpertz Newbie

      Hi people,

       

      I am currently facing some issues when it comes to deleting messages from an artemis message queue configured in wildfly 10.1.

       

      My application works the following way:

       

      I have a  war deployment with the main application running in it. From this application I am able to start batch processes in their own JVM. This newly spawned JVM communicates via the JMS Message Queue and a Topic on the wildfly with some other components which are registered as consumers on the queue. Those consumers run outside of the application server and are not necessarily on the same machine. When an error occurs during the communication/execution/consumption of these messages an error will be thrown and the JVM initially spawned by the main application terminates. The main application, as it administers the JVMs/Batch Processes running is then tasked to clean up the Message Queue and delete all remaining messages of erroneous Job. All messages put to the queue have an ID which identifies the batch job. It is configured as a header property. The following code works fine if no consumer is registered or had already started to consume some messages.

       

      This is the code with which I purge the queue:

       

      Connection tmpConnection = null;
         Session tmpSession = null;
         MessageConsumer consumer = null;
         QueueBrowser browser = null;
        try {

        tmpConnection = connectionFactory.createConnection(System.getProperty("jee.username", DEFAULT_USERNAME), DEFAULT_PASSWORD);
         tmpSession = tmpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue queue = (Queue) initialContext.lookup(System.getProperty("destination", DEFAULT_DESTINATION));
         browser = tmpSession.createBrowser(queue);
         Enumeration<?> enum1 = browser.getEnumeration();
         tmpConnection.start();
        while (enum1.hasMoreElements()) {

        TextMessage msg = (TextMessage) enum1.nextElement();
        if (msg.getStringProperty(JOBID_PROPERTY).equals(jobId)) {

        consumer = tmpSession.createConsumer(queue, "JMSMessageID='" + msg.getJMSMessageID() + "'");
         // noWait does not work necessarily - message could not be delivered at that moment and message is null and this message could
        // remain forever on the queue due to bad timing.
         Message message = consumer.receive(1000);
        if (message != null) {

         // log.info("message not null");
         }

        consumer.close();
         }

       

        }

        } catch (Exception e) {

        e.printStackTrace();
         } finally {

         // tmpSession.commit();
         if (consumer != null) {

        consumer.close();
         }

        browser.close();
         tmpSession.close();
         tmpConnection.close();
         }

      }

       

       

      I know it is overhead to always open a consumer. I just tried to see if I can purge the queue this way completely. It is irrelevant how long it takes to purge the queue after the abort. at the moment.

       

       

      The queue is configured the following way:

       

       

              <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">

                  <server name="default" persistence-enabled="false">

                      <security-setting name="#">

                          <role name="guest" send="true" consume="true" create-non-durable-queue="true" delete-non-durable-queue="true"/>

                          <role name="ejb" send="true" consume="true" create-durable-queue="true" delete-durable-queue="true" create-non-durable-queue="true" delete-non-durable-queue="true" manage="true"/>

                      </security-setting>

                      <address-setting name="#" dead-letter-address="jms.queue.DLQ" expiry-address="jms.queue.ExpiryQueue" max-size-bytes="300000000" page-size-bytes="10000000" message-counter-history-day-limit="10"/>

                      <http-connector name="http-connector" socket-binding="http" endpoint="http-acceptor"/>

                      <http-connector name="http-connector-throughput" socket-binding="http" endpoint="http-acceptor-throughput">

                          <param name="direct-deliver" value="true"/>

                      </http-connector>

                      <in-vm-connector name="in-vm" server-id="0"/>

                      <http-acceptor name="http-acceptor" http-listener="default"/>

                      <http-acceptor name="http-acceptor-throughput" http-listener="default">

                          <param name="direct-deliver" value="true"/>

                      </http-acceptor>

                      <remote-acceptor name="tcp-acceptor" socket-binding="tcp"/>

                      <in-vm-acceptor name="in-vm" server-id="0"/>

                      <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>

                      <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>

                      <jms-queue name="myTrace" entries="java:jboss/exported/jms/myTrace" durable="false"/>

                      <jms-queue name="myDataPackage" entries="java:jboss/exported/jms/myDataPackage" durable="false"/>

                      <jms-topic name="myWorkflow" entries="java:jboss/exported/jms/myWorkflow"/>

                      <jms-topic name="myTopic" entries="java:jboss/exported/jms/myTopic"/>

                      <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm" client-failure-check-period="-1" connection-ttl="-1" min-large-message-size="600000000"/>

                      <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector" client-failure-check-period="-1" connection-ttl="-1" min-large-message-size="600000000"/>

                      <pooled-connection-factory name="activemq-ra" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm" client-failure-check-period="-1" connection-ttl="-1" min-large-message-size="600000000" transaction="xa"/>

                  </server>

              </subsystem>

       

       

       

      Initially I thought it had something to do with the consumers having messages already consumed and with abort of the JVM those partially consumed messages are rolled back onto the queue. Could that be a valid reason for this?

       

      I hope you can help me out here as I have already read the artemis documentation and not found a clue. The above code works fine when there is no consumer on the queue already or if the message consumption has not yet started. (FYI there is a slight delay before the consumers are initialized because they need more context information which they get from somewhere else)

       

       

      Thanks already!

       

      Regards,

      Francesca