1 2 Previous Next 16 Replies Latest reply on Nov 24, 2006 1:12 AM by s0d0 Go to original post
      • 15. Re: Could not enlist in transaction on entering meta-aware o
        weston.price

        Looking at your code, I am not sure what you are trying to accomplish. Being that your EJB is annotated with the REQUIRED transaction attribute a Txn will already exist prior to the createConnection() method. The underlying ConnectionManager (XA or Local depending upon your *-ds.xml file) will ensure that the unerlying resources are enlisted in the transaction automatically. Typically there is no need to acquire an XASession or do anything outside of the standard JMS API.


        Could you explain what you are trying to accomplish?

        • 16. Re: Could not enlist in transaction on entering meta-aware o

          Here is the real "code" :)

          PollingBean.poll is called from another session bean's ejbTimeout.

          First PollingBean connects to ftp/sftp server and fetches list of files
          in directory. Then it calls processMessage that fetches the file content
          from ftp/sftp server and sends it to JMS queue, writes tracking information
          about fetching file from server and then makes an action to file (for example deletes the file). If deleting file fails,sending of jms message must be rolled back. Same thing with tracking information.

          So we want to that every processMessage is done in own transaction.


          Weston: Could you come to irc #jboss now?

          -Juha-







          package fi.logiasoftware.messageserver.services.backend;
          
          import java.io.UnsupportedEncodingException;
          import java.util.Collection;
          import java.util.HashMap;
          import java.util.Iterator;
          import java.util.Set;
          import java.util.Vector;
          
          import javax.annotation.Resource;
          import javax.ejb.EJB;
          import javax.ejb.EJBException;
          import javax.ejb.Local;
          import javax.ejb.Remote;
          import javax.ejb.SessionContext;
          import javax.ejb.Stateless;
          import javax.ejb.TransactionAttribute;
          import javax.ejb.TransactionAttributeType;
          import javax.jms.DeliveryMode;
          import javax.jms.JMSException;
          import javax.jms.MessageProducer;
          import javax.jms.Queue;
          import javax.jms.Session;
          import javax.jms.TextMessage;
          import javax.jms.XAConnection;
          import javax.jms.XASession;
          import javax.naming.InitialContext;
          import javax.naming.NamingException;
          import javax.persistence.EntityManager;
          import javax.persistence.PersistenceContext;
          import javax.persistence.Query;
          import javax.xml.bind.JAXBException;
          import org.apache.log4j.Logger;
          import org.jboss.annotation.ejb.LocalBinding;
          import org.jboss.annotation.ejb.RemoteBinding;
          
          import com.sonicsw.sonicmq.j2ee.jmsra.sonicra.SonicConnection;
          import com.sonicsw.sonicmq.j2ee.jmsra.sonicra.SonicConnectionFactory;
          
          import fi.logiasoftware.messageserver.config.Chain;
          import fi.logiasoftware.messageserver.config.Chainstep;
          import fi.logiasoftware.messageserver.config.Customer;
          import fi.logiasoftware.messageserver.config.Destination;
          import fi.logiasoftware.messageserver.config.Endpoint;
          import fi.logiasoftware.messageserver.services.common.MessageEnvelope;
          import fi.logiasoftware.messageserver.services.common.Util;
          import fi.logiasoftware.messageserver.services.handlers.DirectoryInfoImpl;
          import fi.logiasoftware.resourceadapters.common.AbstractProtocolHandler;
          import fi.logiasoftware.resourceadapters.common.DirectoryInfo;
          import fi.logiasoftware.resourceadapters.common.MessageContainer;
          import fi.logiasoftware.resourceadapters.common.ProtocolHandlerException;
          import fi.logiasoftware.resourceadapters.generic.cci.GenericConnection;
          import fi.logiasoftware.resourceadapters.generic.cci.GenericConnectionFactory;
          import fi.logiasoftware.resourceadapters.generic.cci.GenericConnectionSpec;
          
          /**
           * Copyright 2006 Logia Software.All rights reserved.
           *
           * This class is a Stateless bean.It is used to poll messages.
           *
           * @Stateless
           * @RemoteBinding(jndiBinding="Polling")
           * @LocalBinding(jndiBinding="PollingLocal")
           * @Remote(Polling.class)
           * @Local(Polling.class)
           * @TransactionAttribute(TransactionAttributeType.REQUIRED)
           *
           *
           * implements Polling
           *
           * @author hbm2java (latest modification by $Author: sam $)
           * @version $Revision: 1.17 $ $Date: 2006/11/02 12:04:29 $
           */
          
          @Stateless
          @RemoteBinding(jndiBinding = "Polling")
          @LocalBinding(jndiBinding = "PollingLocal")
          @Remote(Polling.class)
          @Local(Polling.class)
          @TransactionAttribute(TransactionAttributeType.REQUIRED)
          public class PollingBean extends MessageSenderBase implements Polling {
          
           private static final long serialVersionUID = 1L;
          
           @Resource(mappedName = "java:/GRA")
           private GenericConnectionFactory tcf;
          
           /*
           * @EJB private MessageSender messageSender;
           */
          
           @EJB
           private TrackingWriter trackingWriter;
          
           @EJB
           StatusWriter statusWriter;
          
           @PersistenceContext(unitName = "MessageServer")
           private EntityManager entityManager;
          
           @EJB
           private Configuration config;
          
           private Logger logger = Logger
           .getLogger("fi.logiasoftware.message.server.services.backend.PollingBean");
          
           public final static int UNKNOWN_TXN_TYPE = -1;
          
           public final static int XA_TXN_TYPE = 1;
          
           public final static int NON_XA_TXN_TYPE = 2;
          
           private int m_txnType = UNKNOWN_TXN_TYPE;
          
           private DirectoryInfo getDirectoryInfo(Destination dest) {
          
           DirectoryInfoImpl dirInfo = new DirectoryInfoImpl();
          
           dirInfo.setPath(dest.getPath());
           dirInfo.setAction(dest.getAction());
           dirInfo.setActiondetail(dest.getActiondetail());
           dirInfo.setFileprefix(dest.getFileprefix());
           dirInfo.setFilesuffix(dest.getFilesuffix());
          
           return dirInfo;
           }
          
           private GenericConnectionSpec getConnectionSpec(Endpoint endPoint,
           String handlerClass) {
          
           GenericConnectionSpec cs = new GenericConnectionSpec();
          
           cs.putIntegerValue("id", endPoint.getId());
           cs.putStringValue("username", endPoint.getUsername());
           cs.putStringValue("password", endPoint.getPassword());
           cs.putIntegerValue("port", endPoint.getPort());
           cs.putIntegerValue("timeout", endPoint.getTimeout());
           cs.putStringValue("hostname", endPoint.getHostname());
           cs.putStringValue("handlerclass", handlerClass);
           cs.putStringValue("transfertype", endPoint.getTransfertype());
           cs.putStringValue("connectmode", endPoint.getConnectmode());
           cs.putIntegerValue("as400", endPoint.getAs400());
          
           return cs;
          
           }
          
           /**
           * If chainstep has value and handlerClass gets value processDirectories is
           * called to process message.
           *
           *
           * @param s :
           * chainstep as Chainstep
           */
           @TransactionAttribute(TransactionAttributeType.REQUIRED)
           public void poll(Chainstep s) {
          
           GenericConnection conn = null;
           AbstractProtocolHandler handler = null;
          
           Query q = entityManager
           .createQuery("from Chainstep c where c.id = :id");
           q.setParameter("id", s.getId());
          
           Chainstep step = (Chainstep) q.getSingleResult();
          
           logger.debug("PollingBean");
          
           if (step == null) {
           return;
           }
          
           try {
          
           Integer customerId = step.getChain().getCustomer().getId();
           Integer chainId = step.getChain().getId();
           Integer stepId = step.getId();
           String jobId = "not available";
          
           Endpoint entrypoint = step.getEndpoint();
           String handlerClass = config.getProperty(entrypoint.getProtocol());
          
           if (handlerClass == null || handlerClass.length() == 0) {
           statusWriter.writeLog(customerId, chainId, stepId, jobId,
           "Internal configuration error. Could not find handler class for protocol "
           + entrypoint.getProtocol());
           return;
           }
          
           GenericConnectionSpec cs = getConnectionSpec(entrypoint,
           handlerClass);
           conn = tcf.getConnection(cs);
           handler = conn.getHandler();
          
           if (handler == null) {
          
           logger.warn("Could not connect to " + entrypoint.getHostname());
           statusWriter
           .writeLog(
           customerId,
           chainId,
           stepId,
           jobId,
           "Could not connect to "
           + entrypoint.getHostname()
           + ".Detailed error message can be found from server log.");
           return;
           }
          
           processDirectories(handler, step.getId());
          
           } catch (NamingException ne) {
           throw new EJBException(ne);
           } finally {
           if (conn != null) {
           conn.closeConnection();
           logger.debug("closed connection to JCA resource");
           }
           }
           }
          
           /**
           * Processes messages and calls messageSenders sendMessage to send message.
           *
           *
           * @param handler :
           * protocol handler as AbstractProtocolHandler
           * @param stepid :
           * chainsteps id as Integer
           */
           @TransactionAttribute(TransactionAttributeType.REQUIRED)
           private void processDirectories(AbstractProtocolHandler handler,
           Integer stepid) {
          
           SonicConnection jmsConn = null;
          
           Chainstep step = entityManager.find(Chainstep.class, stepid);
          
           Chain chain = step.getChain();
           Integer stepId = step.getId();
          
           Customer customer = chain.getCustomer();
          
           logger.debug("processDirectories");
           Collection directories = step.getDestinations();
           Iterator it = directories.iterator();
          
           // jmsConn = openConnection();
          
           while (it.hasNext()) {
          
           Destination directory = (Destination) it.next();
          
           DirectoryInfo dirInfo = getDirectoryInfo(directory);
          
           logger.debug(dirInfo.getPath());
           logger.debug(dirInfo.getFileprefix());
           logger.debug(dirInfo.getFilesuffix());
          
           try {
          
           Vector<String> filenames = handler.getMessages(dirInfo);
          
           for (String filename : filenames) {
          
           MessageContainer msgC = handler.getMessage(filename);
          
           MessageEnvelope envelope = null;
          
           HashMap<String, Comparable> m = new HashMap<String, Comparable>();
          
           String msgData = null;
          
           String jobId = null;
          
           logger.info("Processsing message " + dirInfo.getPath()
           + " " + msgC.getFilename());
          
           m.put("filename", msgC.getFilename());
           m.put("stepid", step.getId());
           m.put("customerid", customer.getId());
           m.put("chainid", chain.getId());
           m.put("orderno", step.getOrderno());
          
           // Puretaan kehys, kun kerta on käsketty
           if (step.getChainend() > 0) {
           envelope = new MessageEnvelope();
          
           try {
          
           envelope.parse(msgC
           .getData(directory.getEncoding()),
           directory.getEncoding());
           } catch (JAXBException e) {
           throw new EJBException(e);
           }
          
           jobId = (String) envelope.getProperties().get("jobid");
          
           msgData = envelope.getData();
          
           } else {
           jobId = Util.generateGUID(msgC.getFilename());
           msgData = msgC.getData(directory.getEncoding());
          
           }
          
           String queueName = null;
          
           if (step.getToesb() != null && !step.getToesb().equals("")) {
           m.put("toesb", step.getToesb());
           queueName = config.getProperty("QUEUETOESB");
          
           } else {
           queueName = config.getProperty(step.getChain()
           .getQueue());
           }
           m.put("jobid", jobId);
          
           /*
           * try {
           *
           * jmsConn = openConnection();
           *
           * sendMessage(jmsConn, msgData, m, queueName);
           * handler.doAction(dirInfo, msgC.getAbsPath(), msgC
           * .getFilename());
           *
           *
           * if (chain.getTracking() == 1) {
           * trackingWriter.writeLog(customer.getId(), chain .getId(),
           * stepId, jobId, msgC .getFilename(), "", "");
           * }
           * } catch(JMSException e) {
           * }
           *
           */
          
           processMessage(handler, msgData, m, queueName, dirInfo,
           msgC, chain, stepId, jobId);
          
           }
          
           } catch (ProtocolHandlerException e) {
          
           StringBuffer msg = new StringBuffer();
           msg
           .append("Error occured while reading message(s) from endpoint. Reason: ");
           msg.append(e.getMessage());
          
           statusWriter.writeLog(customer.getId(), chain.getId(), step
           .getId(), "", msg.toString());
           throw new EJBException(e);
           } catch (UnsupportedEncodingException e) {
           throw new EJBException(e);
           } catch (Exception e) {
           throw new EJBException(e);
           }
           }
          
           }
          
           @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
           private void processMessage(AbstractProtocolHandler handler,
           String msgData, HashMap m, String queueName, DirectoryInfo dirInfo,
           MessageContainer msgC, Chain chain, Integer stepId, String jobId) {
          
           SonicConnection jmsConn = null;
          
           try {
          
           jmsConn = openConnection();
           sendMessage(jmsConn, msgData, m, queueName);
           handler.doAction(dirInfo, msgC.getAbsPath(), msgC.getFilename());
          
           if (chain.getTracking() == 1) {
           trackingWriter.writeLog(chain.getCustomer().getId(), chain
           .getId(), stepId, jobId, msgC.getFilename(), "", "");
          
           }
          
           } catch (ProtocolHandlerException e) {
           statusWriter.writeLog(chain.getCustomer().getId(), chain.getId(),
           stepId, jobId, "Excecuting action " + dirInfo.getAction()
           + " for message failed. Reason: " + e.getMessage());
           throw new EJBException(e);
           } catch (JMSException e) {
           statusWriter.writeLog(chain.getCustomer().getId(), chain.getId(),
           stepId, jobId,
           "Sending message to JMS queue failed. Reason: "
           + e.getMessage());
           throw new EJBException(e);
           }
           /*
           finally {
           shutdownConnection(jmsConn);
           }
           */
          
           }
          
          }
          


          1 2 Previous Next