11 Replies Latest reply on Oct 6, 2007 5:51 AM by Fady Matar

    JobExecutor enhencement proposition to allow 3rd framework i

    Pierre-André PASQUALINI Newbie

      Hi all,

      I work currently on integartion of Jbpm solution under our Software which is base on Spring Framwork : using Spring bean for configuration and AOP interception for transactional and logging stuff.

      An Custom ObjectFactory implementation permit to use bean configuration
      made under "Spring Beans" if existe and use default ObjectFactoryImpl if not.

      JobExecutor 's Fields are all "package" , and there is no relative "public" setter, soo it's impossible to set values from a Spring Bean Context. One of the simplest way to solve this is to simply add setters for all field present under jbpm.cfg.xml file.

      Un other point is About posibility to use Spring AOP transaction interception
      during assynchronous Job Execution (ie Using transaction context of hosting platform of course if provided). For this i propose to un-couple JbpmContext technical (aquire/close/exception handling) and functional (Job Execution) stuff using Service/Callback writing pattern.

      Enhencement Resume :
      1/ JobExecutor setter Add
      2/ JobExecutorThread execution actions under host provided transactional context.

      To give you an idee i send you a "draft" patch corresponding to my implementation. I made it to limite impact at the minimum on code base. At the first aproche i only refactor JobExecutorThread.executeJob() to use Callback but i think it's possible to extends this methodology to all other point where there is JbpmContext creation.

      i waiting for your feed back

      best regards

      
      Index: src/jpdl/org/jbpm/job/executor/JobExecutor.java
      ===================================================================
      --- src/jpdl/org/jbpm/job/executor/JobExecutor.java (revision 20)
      +++ src/jpdl/org/jbpm/job/executor/JobExecutor.java (working copy)
      @@ -15,11 +15,14 @@
       import org.apache.commons.logging.LogFactory;
       import org.jbpm.JbpmConfiguration;
      
      +
       public class JobExecutor implements Serializable {
      
       private static final long serialVersionUID = 1L;
      
       JbpmConfiguration jbpmConfiguration;
      + JobExecutorService jobExecutorService;
      +
       String name;
       int nbrOfThreads;
       int idleInterval;
      @@ -164,6 +167,57 @@
       public int getNbrOfThreads() {
       return nbrOfThreads;
       }
      +
      + public void setLockMonitorThread(LockMonitorThread in_lockMonitorThread) {
      + lockMonitorThread = in_lockMonitorThread;
      + }
      +
      + public void setJbpmConfiguration(JbpmConfiguration in_jbpmConfiguration) {
      + jbpmConfiguration = in_jbpmConfiguration;
      + }
      +
      + public void setName(String in_name) {
      + name = in_name;
      + }
      +
      + public void setIdleInterval(int in_idleInterval) {
      + idleInterval = in_idleInterval;
      + }
      +
      + public void setMaxIdleInterval(int in_maxIdleInterval) {
      + maxIdleInterval = in_maxIdleInterval;
      + }
      +
      + public void setHistoryMaxSize(int in_historyMaxSize) {
      + historyMaxSize = in_historyMaxSize;
      + }
      +
      + public void setMaxLockTime(int in_maxLockTime) {
      + maxLockTime = in_maxLockTime;
      + }
      +
      + public void setLockMonitorInterval(int in_lockMonitorInterval) {
      + lockMonitorInterval = in_lockMonitorInterval;
      + }
      +
      + public void setLockBufferTime(int in_lockBufferTime) {
      + lockBufferTime = in_lockBufferTime;
      + }
      +
      + public void setNbrOfThreads(int in_nbrOfThreads) {
      + nbrOfThreads = in_nbrOfThreads;
      + }
      +
      + public JobExecutorService getJobExecutorService() {
      + if (jobExecutorService == null){
      + jobExecutorService = new JobExecutorServiceImpl(this);
      + }
      + return jobExecutorService;
      + }
      +
      + public void setJobExecutorService(JobExecutorService in_jobExecutorService) {
      + jobExecutorService = in_jobExecutorService;
      + }
      
       private static Log log = LogFactory.getLog(JobExecutor.class);
       }
      Index: src/jpdl/org/jbpm/job/executor/JobExecutorCallback.java
      ===================================================================
      --- src/jpdl/org/jbpm/job/executor/JobExecutorCallback.java (revision 0)
      +++ src/jpdl/org/jbpm/job/executor/JobExecutorCallback.java (revision 0)
      @@ -0,0 +1,18 @@
      +package org.jbpm.job.executor;
      +
      +import org.jbpm.JbpmContext;
      +
      +/**
      + *
      + * @author p.pasqual
      + *
      + */
      +public interface JobExecutorCallback {
      +
      + /**
      + *
      + * @param in_context
      + * @return
      + */
      + public Object doInJobExecutor(JbpmContext in_jbpmContext);
      +}
      Index: src/jpdl/org/jbpm/job/executor/JobExecutorService.java
      ===================================================================
      --- src/jpdl/org/jbpm/job/executor/JobExecutorService.java (revision 0)
      +++ src/jpdl/org/jbpm/job/executor/JobExecutorService.java (revision 0)
      @@ -0,0 +1,15 @@
      +package org.jbpm.job.executor;
      +
      +/**
      + *
      + * @author p.pasqual
      + *
      + */
      +public interface JobExecutorService {
      +
      + /**
      + *
      + * @param in_callback
      + */
      + public void execute(JobExecutorCallback in_callback);
      +}
      Index: src/jpdl/org/jbpm/job/executor/JobExecutorServiceImpl.java
      ===================================================================
      --- src/jpdl/org/jbpm/job/executor/JobExecutorServiceImpl.java (revision 0)
      +++ src/jpdl/org/jbpm/job/executor/JobExecutorServiceImpl.java (revision 0)
      @@ -0,0 +1,53 @@
      +package org.jbpm.job.executor;
      +
      +import org.apache.commons.logging.Log;
      +import org.apache.commons.logging.LogFactory;
      +import org.jbpm.JbpmContext;
      +
      +import org.jbpm.persistence.JbpmPersistenceException;
      +import org.jbpm.persistence.db.StaleObjectLogConfigurer;
      +
      +public class JobExecutorServiceImpl implements JobExecutorService {
      +
      + private final JobExecutor jobExecutor;
      +
      + public JobExecutorServiceImpl(JobExecutor in_jobExecutor) {
      + super();
      + jobExecutor = in_jobExecutor;
      + }
      +
      + public void execute(JobExecutorCallback callback) {
      + JbpmContext jbpmContext = aquireContext();
      + try {
      + callback.doInJobExecutor(jbpmContext);
      + } finally {
      + try {
      + jbpmContext.close();
      + } catch (JbpmPersistenceException e) {
      + // if this is a stale object exception, the jbpm configuration
      + // has control over the logging
      + if ("org.hibernate.StaleObjectStateException".equals(e.getCause().getClass().getName())) {
      + log.info("problem committing execution transaction: optimistic locking failed");
      + StaleObjectLogConfigurer.staleObjectExceptionsLog.error("problem committing job execution transaction: optimistic locking failed", e);
      + } else {
      + log.error("problem committing execution transaction", e);
      + }
      + } catch (RuntimeException e) {
      + log.error("problem committing execution transaction", e);
      +
      + throw e;
      + }
      + }
      + }
      +
      + protected JbpmContext aquireContext(){
      + return getJobExecutor().getJbpmConfiguration().createJbpmContext();
      + }
      +
      + protected JobExecutor getJobExecutor() {
      + return jobExecutor;
      + }
      +
      + private static Log log = LogFactory.getLog(JobExecutorServiceImpl.class);
      +
      +}
      Index: src/jpdl/org/jbpm/job/executor/JobExecutorThread.java
      ===================================================================
      --- src/jpdl/org/jbpm/job/executor/JobExecutorThread.java (revision 20)
      +++ src/jpdl/org/jbpm/job/executor/JobExecutorThread.java (working copy)
      @@ -17,8 +17,6 @@
       import org.jbpm.db.JobSession;
       import org.jbpm.job.Job;
       import org.jbpm.job.Timer;
      -import org.jbpm.persistence.JbpmPersistenceException;
      -import org.jbpm.persistence.db.StaleObjectLogConfigurer;
      
       public class JobExecutorThread extends Thread {
      
      @@ -153,50 +151,38 @@
       return acquiredJobs;
       }
      
      - protected void executeJob(Job job) {
      - JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
      - try {
      - JobSession jobSession = jbpmContext.getJobSession();
      - job = jobSession.loadJob(job.getId());
      + protected void executeJob(final Job todo) {
      + jobExecutor.getJobExecutorService().execute(new JobExecutorCallback() {
      + public Object doInJobExecutor(JbpmContext jbpmContext) {
      + JobSession jobSession = jbpmContext.getJobSession();
      + Job job = jobSession.loadJob(todo.getId());
      
      - try {
      - log.debug("executing job "+job);
      - if (job.execute(jbpmContext)) {
      - jobSession.deleteJob(job);
      - }
      + try {
      + log.debug("executing job " + job);
      + if (job.execute(jbpmContext)) {
      + jobSession.deleteJob(job);
      + }
      
      - } catch (Exception e) {
      - log.debug("exception while executing '"+job+"'", e);
      - StringWriter sw = new StringWriter();
      - e.printStackTrace(new PrintWriter(sw));
      - job.setException(sw.toString());
      - job.setRetries(job.getRetries()-1);
      - }
      -
      - // if this job is locked too long
      - long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime();
      - if (totalLockTimeInMillis>maxLockTime) {
      - jbpmContext.setRollbackOnly();
      - }
      + } catch (Exception e) {
      + log.debug("exception while executing '" + job + "'", e);
      + StringWriter sw = new StringWriter();
      + e.printStackTrace(new PrintWriter(sw));
      + job.setException(sw.toString());
      + job.setRetries(job.getRetries() - 1);
      + }
      
      - } finally {
      - try {
      - jbpmContext.close();
      - } catch (JbpmPersistenceException e) {
      - // if this is a stale object exception, the jbpm configuration has control over the logging
      - if ("org.hibernate.StaleObjectStateException".equals(e.getCause().getClass().getName())) {
      - log.info("problem committing job execution transaction: optimistic locking failed");
      - StaleObjectLogConfigurer.staleObjectExceptionsLog.error("problem committing job execution transaction: optimistic locking failed", e);
      - } else {
      - log.error("problem committing job execution transaction", e);
      - }
      - } catch (RuntimeException e) {
      - log.error("problem committing job execution transaction", e);
      + // if this job is locked too long
      + long totalLockTimeInMillis = System.currentTimeMillis() - job.getLockTime().getTime();
      + if (totalLockTimeInMillis > maxLockTime) {
      + jbpmContext.setRollbackOnly();
      + }
      
      - throw e;
      - }
      - }
      - }
      + return null;
      + }
      + });
      +
      + }
      +
       protected Date getNextDueDate() {
       Date nextDueDate = null;
       JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();