3 Replies Latest reply on Feb 18, 2014 2:43 AM by mnovak

    JMS between two Wildfly 8 instances

    witoldsz

      Hello,

      I would like to ask how should I configure two Wildfly 8 instances (let's call them W1 and W2). W1 is a JMS server (no apps will be deployed there) and W2 is hosting applications which sends messages to W1 and consume messages from W1. There are also other apps which talks to W1 using STOMP or Hornetq's native protocol. But they are out of scope here, let's focus on W1 and W2.

       

      The question is: should I create pooled connection factory on W2 pointing to W1? If so, does it mean there is no need to define JMS connection factories on W1? Should W1 expose queues in 'java:jboss/exported' space, so W2 apps can ask for them using @Resource(name="...") Queue queueName injections? If so, does it mean I can ask for Queue from remote server and use local connection factory to send a message?

       

      As you can see there are really lots of questions, I could not find answers for in manual.

       

      Thanks,

      Witold Szczerba

        • 1. Re: JMS between two Wildfly 8 instances
          mnovak

          Hi Witold,

           

          here are answers for your questions:

          • should I create pooled connection factory on W2 pointing to W1? - Yes
            • configure connector for java:/JmsXA pooled connection factory (on W2 server) to connect to W1
          • If so, does it mean there is no need to define JMS connection factories on W1? Yes, connection factories on W1 are not needed. Your apps deployed to W2 will you java:/JmsXA connection factory which is in JNDI in the same JVM and can be injected by:
          
          
          @Resource(mappedName = "java:/JmsXA")
          private ConnectionFactory cf;
          
          
          
          
          • Should W1 expose queues in 'java:jboss/exported' space, so W2 apps can ask for them using @Resource(name="...") Queue queueName injections?
            • You can expose queues in 'java:jboss/exported' on W1 but application deployed on W2 will NOT be able to inject them by @Resource(name="..."). At least I'm not aware of such functionality. Here are 2 options:
              1. You will do remote JNDI lookup to W1 server for the queue.
              2. You will call session.createQueue("coreQueueName"); on session (this is not the JNDI name) which was through connection created from "java:/JmsXA" connection factory configured on W2. Don't get confused by "createQueue" method. It does not create queue on remote server. It expects that the queue was already deployed to W1 and just returns reference to it.
          • If so, does it mean I can ask for Queue from remote server and use local connection factory to send a message?
            • Yes, but you can't use @Resource(name="...") and go the way I have mentioned above.

           

          Cheers,

          Mirek

          1 of 1 people found this helpful
          • 2. Re: JMS between two Wildfly 8 instances
            witoldsz

            Hi,

            thanks for your explanations. What you said confirms my private investigation. The trick with #createQueue I have found just by coincidence. I must say I am very disappointed how JMS works, because the deployment details decides how to write application code. If I have dozen classes using @Resource injected queues and now I have to separate JMS server, the code needs to be rewritten. On the other hand there is no difference if my JDBC database is available locally or is it at the other side of the world.... I just can't believe this. I have heard about JMS bridges, but there is so little details about this in documentation, like how does it impact the JMS throughput or what is the scope of transaction... and if they are supposed to address the issues mentioned in this thread?

             

            Last thing I wanted to clarify is how to handle MDBs. I have read that the pooled connection factory on W2 is supposed to be used only to send messages. If that's true, then my MDBs should use W1's connection factories, is that right? Is so, how? I saw an example somewhere on the web where they were actually hard-coding remote JMS IP address and port number in MDB's annotation (sic!). Another example was showing similar nonsense, this time the IP and port numbers were hard-coded in application descriptor. I am not sure if that was a joke or what, but now I can believe everything.

             

            On the other side, I have never seen anyone in JavaEE hard-coding JDBC database server IP address in EJB's annotations or descriptors. What is wrong with JMS spec? Or (I hope) are my speculations wrong? Is that is so, how can I make my application unaware of where is the JMS server?

             

            Thanks,

            Witold Szczerba

            • 3. Re: Re: JMS between two Wildfly 8 instances
              mnovak

              Hi Witold,

               

              yes, the problem here is to get reference to remote queue deployed on W1 server. It's not necessary to hardcode it to your MDB or EJB. You can load property file which is in the deployed jar with your MDB/EJB during deployment and read JNDI properties and jndi queue name of the queue you want to get.

               

              You can use ejb-jar.xml to change activation config properties for MDB. If "hornetq-ra" pooled connection factory which also specifies inbound communication to MDB is configured to connect to W1 server then name of the queueu in "destination" activation config property will be taken from W1 server.

               

              "java:/JmsXA" which is also defined by "hornetq-ra" pooled connection factory can be used also for receiving messages. Yes, there is mixed configuration for inbound and outbound communication but does no harm since in 99% of cases they're connected to the same "jms" server. There is no limitation for receiving and sending messages using "java:/JmsXA" connection factory.

               

              In the end I'm adding source of one MDB as an example. I hope it saves you a few hours. It depends on you where you'd like to lookup the remote queue:

              package org.jboss.qa.hornetq.apps.mdb;
              
              import org.apache.log4j.Level;
              import org.apache.log4j.Logger;
              import org.jboss.qa.hornetq.HornetQTestCaseConstants;
              
              import javax.annotation.Resource;
              import javax.ejb.*;
              import javax.jms.*;
              import javax.naming.Context;
              import javax.naming.InitialContext;
              import javax.naming.NamingException;
              import java.io.InputStream;
              import java.util.Properties;
              import java.util.ResourceBundle;
              import java.util.concurrent.atomic.AtomicInteger;
              
              /**
              * This mdb expects mdb.properties in jar file which can be loaded during runtime(deployment).
              * A MdbWithRemoteOutQueueWithJNDI used for example lodh tests. Used in RemoteJcaWithRecoverTestCase in interop test suite.
              * <p/>
              * This mdb reads messages from queue "InQueue" and sends to queue "OutQueue".
              *
              * @author <a href="mnovak@redhat.com">Miroslav Novak</a>
              * @version $Revision: 1.1 $
              */
              @MessageDriven(name = "MdbWithRemoteOutQueueWithJNDI",
                      activationConfig = {
                              @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                              @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/queue/InQueue"),
                              @ActivationConfigProperty(propertyName = "userName", propertyValue = "user"),
                              @ActivationConfigProperty(propertyName = "user", propertyValue = "user"),
                              @ActivationConfigProperty(propertyName = "password", propertyValue = "pass")})
              @TransactionManagement(value = TransactionManagementType.CONTAINER)
              @TransactionAttribute(value = TransactionAttributeType.REQUIRED)
              public class MdbWithRemoteOutQueueWithJNDI implements MessageDrivenBean, MessageListener {
              
                  public static String MDB_PROPERTY_FILE = "mdb.properties";
                  public static String REMOTE_SERVER_HOSTNAME = "remote-server-hostname";
                  public static String REMOTE_SERVER_PORT = "remote-server-port";
                  public static String REMOTE_SERVER_TYPE = "remote-server-type";
                  public static String OUTQUEUE_JNDI_NAME = "outqeue-jndi-name";
              
                  private static final long serialVersionUID = 2770941392406343837L;
                  private static final Logger log = Logger.getLogger(MdbWithRemoteOutQueueWithJNDI.class.getName());
                  private Queue queue = null;
                  public static AtomicInteger numberOfProcessedMessages = new AtomicInteger();
              
                  @Resource(mappedName = "java:/JmsXA")
                  private ConnectionFactory cf;
              
                  @Resource
                  private MessageDrivenContext context;
              
                  @Override
                  public void onMessage(Message message) {
              
                      Connection con = null;
                      Session session;
              
                      try {
              
                          long time = System.currentTimeMillis();
                          int counter = 0;
                          try {
                              counter = message.getIntProperty("count");
                          } catch (Exception e) {
                              log.log(Level.ERROR, e.getMessage(), e);
                          }
              
                          String messageInfo = message.getJMSMessageID() + ", count:" + counter;
              
                          log.debug(" Start of message:" + messageInfo);
              
                          for (int i = 0; i < (5 + 5 * Math.random()); i++) {
                              try {
                                  Thread.sleep((int) (10 + 10 * Math.random()));
                              } catch (InterruptedException ex) {
                              }
                          }
              
                          con = cf.createConnection("user", "pass");
              
                          session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
              
                          con.start();
              
                          String text = message.getJMSMessageID() + " processed by: " + hashCode();
              
                          if (queue == null)  {
                              makeLookups();
                          }
              
                          MessageProducer sender = session.createProducer(queue);
                          TextMessage newMessage = session.createTextMessage(text);
                          newMessage.setStringProperty("inMessageId", message.getJMSMessageID());
                          newMessage.setStringProperty("_HQ_DUPL_ID", message.getStringProperty("_HQ_DUPL_ID"));
                          sender.send(newMessage);
              
                          messageInfo = messageInfo + ". Sending new message with inMessageId: " + newMessage.getStringProperty("inMessageId")
                                  + " and messageId: " + newMessage.getJMSMessageID();
              
                          log.debug("End of " + messageInfo + " in " + (System.currentTimeMillis() - time) + " ms");
              
                          if (numberOfProcessedMessages.incrementAndGet() % 100 == 0)
                              log.info(messageInfo + " in " + (System.currentTimeMillis() - time) + " ms");
              
                      } catch (Exception t) {
                          log.error(t.getMessage(), t);
                          this.context.setRollbackOnly();
                      } finally {
                          if (con != null) {
                              try {
                                  con.close();
                              } catch (JMSException e) {
                                  log.log(Level.FATAL, e.getMessage(), e);
                              }
                          }
                      }
                  }
              
                  /**
                  * Lookup out queue
                  *
                  * @throws EJBException
                  */
                  public void makeLookups() throws EJBException {
              
                      Properties prop = new Properties();
              
                      Context ctxRemote = null;
              
                      try {
                          // load mdb.properties - by this classloader
                          Thread currentThred = Thread.currentThread();
                          ClassLoader cl = currentThred.getContextClassLoader();
                          InputStream in = cl.getResourceAsStream(MDB_PROPERTY_FILE);
              
                          if (in == null) {
                              System.out.println("No resource found. InputStream is null.!!!!");
                              log.info("No resource found. InputStream is null.!!!!");
                          } else {
                              prop.load(in);
              
                              String hostname = prop.getProperty(REMOTE_SERVER_HOSTNAME);
                              String port = prop.getProperty(REMOTE_SERVER_PORT);
                              String serverType = prop.getProperty(REMOTE_SERVER_TYPE); // EAP 5, 6, ...
                              String outQueueJndiName = prop.getProperty(OUTQUEUE_JNDI_NAME);
              
                              log.info("Property name:" + REMOTE_SERVER_HOSTNAME + " has value: " + hostname);
                              log.info("Property name:" + REMOTE_SERVER_PORT + " has value: " + port);
                              log.info("Property name:" + REMOTE_SERVER_TYPE + " has value: " + serverType);
                              log.info("Property name:" + OUTQUEUE_JNDI_NAME + " has value: " + outQueueJndiName);
              
                              System.out.println("Property name:" + REMOTE_SERVER_HOSTNAME + " has value: " + hostname);
                              System.out.println("Property name:" + REMOTE_SERVER_PORT + " has value: " + port);
                              System.out.println("Property name:" + REMOTE_SERVER_TYPE + " has value: " + serverType);
                              System.out.println("Property name:" + OUTQUEUE_JNDI_NAME + " has value: " + outQueueJndiName);
              
                              final Properties env = new Properties();
              
              //                if (HornetQTestCaseConstants.EAP5_CONTAINER.equals(serverType)) { // it's eap 5
                              env.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
                              env.setProperty("java.naming.provider.url", "jnp://" + hostname + ":" + port);
                              env.setProperty("java.naming.factory.url.pkgs", "org.jnp.interfaces.NamingContextFactory");
              //                } else { // it's EAP 6
              //                    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.remote.client.InitialContextFactory");
              //                    env.put(Context.PROVIDER_URL, "remote://" + hostname + ":" + port);
              //                }
                              ctxRemote = new InitialContext(env);
                              queue = (Queue) ctxRemote.lookup(outQueueJndiName);
                          }
              
                      } catch (Exception e) {
                          e.printStackTrace();
                      } finally {
                          if (ctxRemote != null) {
                              try {
                                  ctxRemote.close();
                              } catch (NamingException e) {
                                  //ignore
                              }
                          }
                      }
                  }
              
                  @Override
                  public void setMessageDrivenContext(MessageDrivenContext ctx) throws EJBException {
                  }
              
                  @Override
                  public void ejbRemove() throws EJBException {
                  }
              }