5 Replies Latest reply on Feb 19, 2015 8:02 AM by swiderski.maciej

    Problem with dispatcher process

    toja

      Hi,

      In our project we have two processes: procA and dispatcher process. Dispatcher process creates new instances of procA and sends signals to other processes. ProcA is a simple process - gets some data via REST and create human task.

      We run dispatcher and try to start about 5000 new processes. It takes about 2 hours to complete the dispatcher process.

       

      During this time we have problem with REST requests. When we try to get tasks or send signal via REST we reveived 'Read Timeout exception'. Probably it is because the jbpm server is busy and it consumes all their resources on creating new instances of procA (handling dispatcher process). When dispatcher process is completed everything is fine. We have already added some timers to sleep process but it still not enough.

       

      Is it possible to fix the problem, eg. to have isolated thraeds for processing REST queries and handling jbpm processes? We would like to use REST interface when dispatcher process is run in background.

        • 1. Re: Problem with dispatcher process
          swiderski.maciej

          how does the dispatcher process create instances? Using reusable subprocess/call activity or? I'd go for async work to perform this operation - meaning use jbpm executor that will use dedicated command to create process instances. By default single  thread would perform this work so only one thread would be locked, though depending if the work is heavy (new instances will directly call REST end point) then I'd split this up across multiple jobs for improved performance.

           

          Keep in mind that each thread request on app server will be handled by single thread, so it might be the case that you simply taking all threads from web container and by that there is no more resources that can actually process your other requests. So moving that into the background (jbpm executor) can free up your web container threads.

           

          HTH

          • 2. Re: Problem with dispatcher process
            toja
            • 3. Re: Problem with dispatcher process
              swiderski.maciej

              make sure that command is available on class path - either global of the application or the kjar. See an example command here and process that makes use of that command here where upon start you should specify fully qualified class name (org.jbpm.examples.cmd.UserCommand).

               

              HTH

              • 4. Re: Problem with dispatcher process
                toja

                Maciej, we successfully implemented executors, but we still have one problem:

                During the executors are working in progress and someone tries to claim/complete any task sometimes we got exception:

                 

                java.lang.RuntimeException: Unexpected exception executing action org.jbpm.process.instance.event.DefaultSignalManager$SignalAction@139b2a08

                  at org.drools.core.impl.StatefulKnowledgeSessionImpl.executeQueuedActions(StatefulKnowledgeSessionImpl.java:1517) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.event.DefaultSignalManager.signalEvent(DefaultSignalManager.java:73) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.persistence.processinstance.JPASignalManager.signalEvent(JPASignalManager.java:37) ~[jbpm-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.ProcessRuntimeImpl.signalEvent(ProcessRuntimeImpl.java:441) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.impl.StatefulKnowledgeSessionImpl.signalEvent(StatefulKnowledgeSessionImpl.java:483) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.command.runtime.process.SignalEventCommand.execute(SignalEventCommand.java:91) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.command.runtime.process.SignalEventCommand.execute(SignalEventCommand.java:32) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.command.impl.DefaultCommandService.execute(DefaultCommandService.java:36) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.command.impl.AbstractInterceptor.executeNext(AbstractInterceptor.java:41) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.persistence.SingleSessionCommandService$TransactionInterceptor.execute(SingleSessionCommandService.java:509) ~[drools-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.persistence.SingleSessionCommandService.execute(SingleSessionCommandService.java:353) ~[drools-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.command.impl.CommandBasedStatefulKnowledgeSession.signalEvent(CommandBasedStatefulKnowledgeSession.java:208) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at pl.groupama.processes.statistics.StatisticsReceiverCommand.execute(StatisticsReceiverCommand.java:26) ~[na:]

                  at pl.groupama.processes.statistics.AbstractStatisticCommand.execute(AbstractStatisticCommand.java:31) [na:]

                  at org.jbpm.executor.impl.AvailableJobsExecutor.executeJob(AvailableJobsExecutor.java:122) [jbpm-executor-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.executor.impl.ExecutorRunnable.run(ExecutorRunnable.java:42) [jbpm-executor-6.1.0.Final.jar:6.1.0.Final]

                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25]

                  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_25]

                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_25]

                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_25]

                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25]

                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25]

                  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]

                Caused by: java.lang.RuntimeException: Process instance 21641[kosiarka-processes.GenericStatisticProcess] is disconnected.

                  at org.jbpm.process.instance.impl.ProcessInstanceImpl.getProcess(ProcessInstanceImpl.java:90) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.impl.ProcessInstanceImpl.getContextInstance(ProcessInstanceImpl.java:161) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.workflow.instance.WorkflowRuntimeException.initialize(WorkflowRuntimeException.java:56) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.workflow.instance.WorkflowRuntimeException.<init>(WorkflowRuntimeException.java:42) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.workflow.instance.impl.NodeInstanceImpl.trigger(NodeInstanceImpl.java:168) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.ruleflow.instance.RuleFlowProcessInstance.internalStart(RuleFlowProcessInstance.java:35) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.impl.ProcessInstanceImpl.start(ProcessInstanceImpl.java:226) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.workflow.instance.impl.WorkflowProcessInstanceImpl.start(WorkflowProcessInstanceImpl.java:363) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.ProcessRuntimeImpl.startProcessInstance(ProcessRuntimeImpl.java:187) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.ProcessRuntimeImpl.startProcess(ProcessRuntimeImpl.java:169) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.ProcessRuntimeImpl$StartProcessEventListener.signalEvent(ProcessRuntimeImpl.java:386) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.event.DefaultSignalManager.internalSignalEvent(DefaultSignalManager.java:81) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.process.instance.event.DefaultSignalManager$SignalAction.execute(DefaultSignalManager.java:181) ~[jbpm-flow-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.impl.StatefulKnowledgeSessionImpl.executeQueuedActions(StatefulKnowledgeSessionImpl.java:1515) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  ... 22 common frames omitted

                2015-02-18 12:09:39,110 [pool-5-thread-1] ERROR Error when handling callback from executor

                java.lang.RuntimeException: Unable to commit transaction

                  at org.drools.persistence.jta.JtaTransactionManager.commit(JtaTransactionManager.java:212) ~[drools-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.persistence.SingleSessionCommandService$TransactionInterceptor.execute(SingleSessionCommandService.java:512) ~[drools-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.persistence.SingleSessionCommandService.execute(SingleSessionCommandService.java:353) ~[drools-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  at org.drools.core.command.impl.CommandBasedStatefulKnowledgeSession.execute(CommandBasedStatefulKnowledgeSession.java:478) ~[drools-core-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.executor.impl.wih.AsyncWorkItemHandlerCmdCallback.onCommandError(AsyncWorkItemHandlerCmdCallback.java:80) ~[jbpm-executor-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.executor.impl.AvailableJobsExecutor.executeJob(AvailableJobsExecutor.java:170) [jbpm-executor-6.1.0.Final.jar:6.1.0.Final]

                  at org.jbpm.executor.impl.ExecutorRunnable.run(ExecutorRunnable.java:42) [jbpm-executor-6.1.0.Final.jar:6.1.0.Final]

                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25]

                  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_25]

                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_25]

                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_25]

                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25]

                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25]

                  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]

                Caused by: bitronix.tm.internal.BitronixRollbackException: RuntimeException thrown during beforeCompletion cycle caused transaction rollback

                  at bitronix.tm.BitronixTransaction.commit(BitronixTransaction.java:241) ~[btm-2.1.4.jar:2.1.4]

                  at bitronix.tm.BitronixTransactionManager.commit(BitronixTransactionManager.java:143) ~[btm-2.1.4.jar:2.1.4]

                  at org.drools.persistence.jta.JtaTransactionManager.commit(JtaTransactionManager.java:209) ~[drools-persistence-jpa-6.1.0.Final.jar:6.1.0.Final]

                  ... 13 common frames omitted

                Caused by: javax.persistence.PersistenceException: org.hibernate.HibernateException: Flush during cascade is dangerous

                  at org.hibernate.ejb.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1387) ~[hibernate-entitymanager-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.ejb.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1310) ~[hibernate-entitymanager-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.ejb.AbstractEntityManagerImpl.convert(AbstractEntityManagerImpl.java:1316) ~[hibernate-entitymanager-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.ejb.AbstractEntityManagerImpl$CallbackExceptionMapperImpl.mapManagedFlushFailure(AbstractEntityManagerImpl.java:1510) ~[hibernate-entitymanager-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorNonTrackingImpl.beforeCompletion(SynchronizationCallbackCoordinatorNonTrackingImpl.java:110) ~[hibernate-core-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.engine.transaction.synchronization.internal.RegisteredSynchronization.beforeCompletion(RegisteredSynchronization.java:53) ~[hibernate-core-4.2.12.Final.jar:4.2.12.Final]

                  at bitronix.tm.BitronixTransaction.fireBeforeCompletionEvent(BitronixTransaction.java:532) ~[btm-2.1.4.jar:2.1.4]

                  at bitronix.tm.BitronixTransaction.commit(BitronixTransaction.java:235) ~[btm-2.1.4.jar:2.1.4]

                  ... 15 common frames omitted

                Caused by: org.hibernate.HibernateException: Flush during cascade is dangerous

                  at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1155) ~[hibernate-core-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:404) ~[hibernate-core-4.2.12.Final.jar:4.2.12.Final]

                  at org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorNonTrackingImpl.beforeCompletion(SynchronizationCallbackCoordinatorNonTrackingImpl.java:105) ~[hibernate-core-4.2.12.Final.jar:4.2.12.Final]

                  ... 18 common frames omitted

                 

                Next all jobs queued in executors were finished with error and we had to restart our server to allow claim tasks.

                 

                Looking for solution we tried to change rintime strategy from SINGLETON to PER_PROCESS_INSTANCE but we got error: KieSession with id XX is already used by another context

                We also tried to add parameter org.kie.tx.lock.enabled=true but when we start proces signalling event we got readTimeot:

                org.kie.services.client.api.command.exception.RemoteCommunicationException: Unable to post request: Read timed out

                  at org.kie.services.client.api.command.AbstractRemoteCommandObject.executeRestCommand(AbstractRemoteCommandObject.java:400)

                  at org.kie.services.client.api.command.AbstractRemoteCommandObject.execute(AbstractRemoteCommandObject.java:120)

                  at org.drools.core.command.impl.CommandBasedStatefulKnowledgeSession.signalEvent(CommandBasedStatefulKnowledgeSession.java:208)

                  at pl.test.JbpmSignalTester.main(JbpmSignalTester.java:114)

                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                  at java.lang.reflect.Method.invoke(Method.java:606)

                  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)

                 

                My commands implementations are attached bellow:

                 

                public abstract class AbstractStatisticCommand implements Command{

                 

                   protected final Logger logger = LoggerFactory.getLogger(this.getClass());

                 

                   @Override
                   public ExecutionResults execute(CommandContext commandContext) throws Exception {

                   final WorkItem workItem = (WorkItem) commandContext.getData("workItem");

                 

                  RuntimeManager manager = getRuntimeManager(commandContext);

                  RuntimeEngine engine = manager.getRuntimeEngine(ProcessInstanceIdContext.get((Long) commandContext.getData("processInstanceId")));

                 

                   try {

                  KieSession kieSession = engine.getKieSession();

                  execute(workItem, kieSession);

                  } finally {

                  manager.disposeRuntimeEngine(engine);

                  }

                   return new ExecutionResults();

                  }

                 

                   protected abstract void execute(WorkItem workItem, KieSession kieSession);

                 

                 

                   private RuntimeManager getRuntimeManager(CommandContext ctx) {

                  String deploymentId = (String) ctx.getData("deploymentId");

                  RuntimeManager runtimeManager = RuntimeManagerRegistry.get().getManager(deploymentId);

                 

                   if (runtimeManager == null) {

                   throw new IllegalStateException("There is no runtime manager for deployment " + deploymentId);

                  }

                 

                   return runtimeManager;

                  }

                }

                 

                protected void execute(WorkItem workItem, KieSession kieSession) {

                   final List batch = (List) workItem.getParameter(INVALID_STATISTICS_BATCH_PARAM_NAME);

                   for (Object event : batch){

                   if (findProcessId(kieSession, (AgentStatisticDTO) event) == null) {

                   try {

                  kieSession.signalEvent(STATISTICS_NEW_INVALID_STATISTIC_EVENT, event);

                  } catch (Exception e){

                   logger.error("error:",e);

                  }

                  }

                  }

                }

                 

                public ProcessInstance findProcessId(KieSession kieSession, AgentStatisticDTO invalidStatistic) {

                  CorrelationKeyInfo correlationKeyInfo = new CorrelationKeyInfo();

                  correlationKeyInfo.setName("processKey");

                 

                  correlationKeyInfo.addProperty(new CorrelationPropertyInfo("agentId", Long.toString((invalidStatistic.getAgentId()))));

                  correlationKeyInfo.addProperty(new CorrelationPropertyInfo("statisticType", (invalidStatistic).getStatisticType().name()));

                 

                   return ((CorrelationAwareProcessRuntime) kieSession ).getProcessInstance(correlationKeyInfo);

                }

                 

                 

                public class StatisticsUpdaterCommand extends AbstractStatisticCommand {

                 

                   public static final String STATISTICS_AGENT_STATISTICS_UPDATED_EVENT = "statistics.agentStatisticsUpdatedEvent";

                   public static final String WAITING_PROCESS_IDS_BATCH_PARAM_NAME = "waitingProcessIdsBatch";

                 

                   @Override
                   protected void execute(WorkItem workItem, KieSession kieSession) {

                   final List batch = (List) workItem.getParameter(WAITING_PROCESS_IDS_BATCH_PARAM_NAME);

                   logger.debug("StatisticsUpdaterCommand for processes: {}", batch);

                   for (Object id : batch) {

                   try {

                  kieSession.signalEvent(STATISTICS_AGENT_STATISTICS_UPDATED_EVENT,null,(Long)id);

                  } catch (Exception e) {

                   logger.error("error:",e);

                  }

                  }

                 

                  }

                 

                 

                Any ideas how to solve it?

                • 5. Re: Problem with dispatcher process
                  swiderski.maciej

                  could you provide bit more details about the environment you run this in? Is it jbpm console/kie-wb or standalone app? If that is jbpm console what app server it is deployed to?

                   

                  problem with:

                  PER_PROCESS_INSTANCE but we got error: KieSession with id XX is already used by another context

                   

                  should already be solved with 6.2

                   

                  this does not make much sense to me:

                     if (findProcessId(kieSession, (AgentStatisticDTO) event) == null) {

                     try {

                    kieSession.signalEvent(STATISTICS_NEW_INVALID_STATISTIC_EVENT, event);

                   

                  why loading process instance and then signaling anyway as broadcast? especially that it is done in a loop so it will signal all instances regardless if they are loaded. Note that everything that is done in executor command is done without transaction so if you like to load an instance and to modification to that do it either via command on ksession or enclose it with UserTransaction.

                   

                  HTH