6 Replies Latest reply on Feb 24, 2014 5:43 AM by lauradp

    Asynchronous task handler crashes when completing the work item

    dragos.snk

      Hello.

       

      Conducting a PoC on jBPM 6.0 (using 6.0.0), we need to integrate an asynchronous REST invocation; the web service will place a message on an Apache ActiveMQ when finished.

      I implemented an asynchronous Receive Task handler following the provided jBPM examples, which follows the REST invocation in the workflow - the code is below.

       

      What happens is: the message is successfully received, but then kieSession.getWorkItemManager().completeWorkItem(workItemId, null); fails with the following stack trace:

      10:40:09,866 ERROR [stderr] (Thread-109) java.lang.NullPointerException

      10:40:09,868 ERROR [stderr] (Thread-109) at org.drools.persistence.jpa.JpaPersistenceContext.findWorkItemInfo(JpaPersistenceContext.java:85)

      10:40:09,872 ERROR [stderr] (Thread-109) at org.drools.persistence.jpa.processinstance.JPAWorkItemManager.completeWorkItem(JPAWorkItemManager.java:123)

      10:40:09,876 ERROR [stderr] (Thread-109) at com.elster.bpm.ReceiveTaskHandler$1.run(ReceiveTaskHandler.java:48)

      ...

       

      JpaPersistenceContext.java:85 - return em.find( WorkItemInfo.class, id ); - which means for some reason em is null

       

      Am i missing something? How can i make this work?

       

      The handler is properly registered in CustomWorkItemHandlers.conf as follows:

      "Receive Task" : new com.elster.bpm.ReceiveTaskHandler(ksession)

       

      Also tried to create a new work item definition (not use "Receive Task", but a new task type) - had the same result...

       

      Actual code:

       

      public class ReceiveTaskHandler implements WorkItemHandler {

          private final KieSession kieSession;


          // URL of the JMS server

          private String url = ActiveMQConnection.DEFAULT_BROKER_URL;


          // Name of the queue we will receive messages from

          private String subject = "TESTQUEUE";

       

          private static Connection connection;

          private static Session session;

          private static MessageConsumer consumer;

       

          public ReceiveTaskHandler(KieSession session){

              kieSession = session;

          }

       

          @Override

          public void executeWorkItem(WorkItem workItem, WorkItemManager workItemManager) {

              final long workItemId = workItem.getId();

       

              new Thread(new Runnable() {

                  public void run() {

                      try {

                          receive();

       

                          kieSession.getWorkItemManager().completeWorkItem(workItemId, null);

                      } catch (Exception e) {

                          e.printStackTrace();

                          throw new RuntimeException("Error encountered while invoking ws operation asynchronously", e);

                      }

                  }

              }).start();

          }

       

          @Override

          public void abortWorkItem(WorkItem workItem, WorkItemManager workItemManager) {

          }

       

          private void receive() {

              try {

                  if(consumer == null){

                      ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory(url);

                      connection = connectionFactory.createConnection();

                      connection.start();

       

                      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

       

                      Destination destination = session.createQueue(subject);

       

                      consumer = session.createConsumer(destination);

                  }

       

                  Message message = consumer.receive();

       

              } catch (JMSException e) {

                  e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.

              }

          }

      }

        • 1. Re: Asynchronous task handler crashes when completing the work item
          dragos.snk

          Downloaded 6.0.1 and tried to run my process - gives a different exception stack:

           

          12:41:37,076 WARN  [org.drools.persistence.TransactionSynchronizationRegistryHelper] (Thread-112) Unable to put resource due to No transaction is running

          12:41:37,082 WARN  [org.hibernate.ejb.AbstractEntityManagerImpl] (Thread-112) HHH000326: Cannot join transaction: do not override hibernate.transaction.factory_class

           

          12:41:37,089 ERROR [stderr] (Thread-112) javax.persistence.TransactionRequiredException: No active JTA transaction on joinTransaction call

          12:41:37,095 ERROR [stderr] (Thread-112) at org.hibernate.ejb.AbstractEntityManagerImpl.joinTransaction(AbstractEntityManagerImpl.java:1210)

          12:41:37,102 ERROR [stderr] (Thread-112) at org.hibernate.ejb.AbstractEntityManagerImpl.joinTransaction(AbstractEntityManagerImpl.java:1160)

          12:41:37,113 ERROR [stderr] (Thread-112) at org.drools.persistence.jpa.AbstractPersistenceContextManager.getCommandScopedEntityManager(AbstractPersistenceContextManager.java:110)

          12:41:37,119 ERROR [stderr] (Thread-112) at org.drools.persistence.jpa.JpaPersistenceContextManager.getCommandScopedPersistenceContext(JpaPersistenceContextManager.java:65)

          12:41:37,123 ERROR [stderr] (Thread-112) at org.drools.persistence.jpa.processinstance.JPAWorkItemManager.completeWorkItem(JPAWorkItemManager.java:112)

          12:41:37,126 ERROR [stderr] (Thread-112) at com.elster.bpm.ReceiveTaskHandler$1.run(ReceiveTaskHandler.java:48)


          Process definition, maybe it helps:

          Test.proc5.png

          • 2. Re: Asynchronous task handler crashes when completing the work item
            jkranes

            I believe the problem is that you are starting a new thread which runs outside of the current transaction.  Try eliminating the new thread and see if that helps.

             

            Jon

            • 3. Re: Asynchronous task handler crashes when completing the work item
              dragos.snk

              The documentation and async examples specify you need to create a new thread.

               

              I tried waiting for the queue message to arrive in executeWorkItem (that is, not creating a new thread) - but that will lock the process instance and the timer boundary event will never get fire, as it will wait for the process instance to become available.... or do I misunderstood this?

               

              Asynchronous web service invocation example can be found here (and it creates a new thread to complete the work item too...): jbpm/jbpm-workitems/src/main/java/org/jbpm/process/workitem/webservice/WebServiceWorkItemHandler.java at master · drools…

               

              Dragos

              • 4. Re: Re: Asynchronous task handler crashes when completing the work item
                swiderski.maciej

                the problem actually is because ksession you get in there (by using conf file) is not the command based and thus lacking transaction support. You should use RuntimeManager and RuntimeEngine within the thread to ensure all is properly configured. Moreover with more advanced strategies like per process instance this will not work at all. So either use work item producers to register work item handlers or rely on RuntimeManager within the thread like this:

                  @Override
                  public void executeWorkItem(WorkItem workItem, WorkItemManager manager) {
                  final long workItemId = workItem.getId();
                  final String depId = ((WorkItemImpl) workItem).getDeploymentId();
                   
                        new Thread(new Runnable() {
                            public void run() {
                                try {
                                       // do you work here
                                    RuntimeManager manager = RuntimeManagerRegistry.get().getManager(depId);
                                    RuntimeEngine engine = manager.getRuntimeEngine(EmptyContext.get());
                                    engine.getKieSession().getWorkItemManager().completeWorkItem(workItemId, null);
                                    manager.disposeRuntimeEngine(engine);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                    throw new RuntimeException("Error encountered while invoking ws operation asynchronously", e);
                                }
                            }
                        }).start();
                  }
                

                HTH

                • 5. Re: Asynchronous task handler crashes when completing the work item
                  dragos.snk

                  Great, thank you Maciej, it works perfectly!

                  • 6. Re: Asynchronous task handler crashes when completing the work item
                    lauradp

                    Hi,

                    How can I perform such a getWorkItemManager() in jBPM 5.4?