Asynchronous task handler crashes when completing the work item
dragos.snk Feb 10, 2014 5:42 AMHello.
Conducting a PoC on jBPM 6.0 (using 6.0.0), we need to integrate an asynchronous REST invocation; the web service will place a message on an Apache ActiveMQ when finished.
I implemented an asynchronous Receive Task handler following the provided jBPM examples, which follows the REST invocation in the workflow - the code is below.
What happens is: the message is successfully received, but then kieSession.getWorkItemManager().completeWorkItem(workItemId, null); fails with the following stack trace:
10:40:09,866 ERROR [stderr] (Thread-109) java.lang.NullPointerException
10:40:09,868 ERROR [stderr] (Thread-109) at org.drools.persistence.jpa.JpaPersistenceContext.findWorkItemInfo(JpaPersistenceContext.java:85)
10:40:09,872 ERROR [stderr] (Thread-109) at org.drools.persistence.jpa.processinstance.JPAWorkItemManager.completeWorkItem(JPAWorkItemManager.java:123)
10:40:09,876 ERROR [stderr] (Thread-109) at com.elster.bpm.ReceiveTaskHandler$1.run(ReceiveTaskHandler.java:48)
...
JpaPersistenceContext.java:85 - return em.find( WorkItemInfo.class, id ); - which means for some reason em is null
Am i missing something? How can i make this work?
The handler is properly registered in CustomWorkItemHandlers.conf as follows:
"Receive Task" : new com.elster.bpm.ReceiveTaskHandler(ksession)
Also tried to create a new work item definition (not use "Receive Task", but a new task type) - had the same result...
Actual code:
public class ReceiveTaskHandler implements WorkItemHandler {
private final KieSession kieSession;
// URL of the JMS server
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// Name of the queue we will receive messages from
private String subject = "TESTQUEUE";
private static Connection connection;
private static Session session;
private static MessageConsumer consumer;
public ReceiveTaskHandler(KieSession session){
kieSession = session;
}
@Override
public void executeWorkItem(WorkItem workItem, WorkItemManager workItemManager) {
final long workItemId = workItem.getId();
new Thread(new Runnable() {
public void run() {
try {
receive();
kieSession.getWorkItemManager().completeWorkItem(workItemId, null);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Error encountered while invoking ws operation asynchronously", e);
}
}
}).start();
}
@Override
public void abortWorkItem(WorkItem workItem, WorkItemManager workItemManager) {
}
private void receive() {
try {
if(consumer == null){
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
Message message = consumer.receive();
} catch (JMSException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
}
}