7 Replies Latest reply on Feb 8, 2012 10:56 AM by philcero

    Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?

    philcero

      Hi all,

       

      I hae searched all the WEB (This site too) to soluce that question.

       

      I have found examples for Topics but they "seems" not to work with standard Queues.

       

      I have two JBoss 6.1 servers (dev00 [1099] and dev01 [1199]) and I try to have a EJB3 in DEV01 which "listen" to messages on a DEV00 queue [/queue/TESTSOUT].

       

      My elements on DEV00 :

       

      server/dev00/deploy/hornetq/hornetq-jms.xml :

      ...

         <queue name="TESTSOUT">

            <entry name="/queue/TESTSOUT"/>

         </queue>

      ...

       

      The queue is present in the admin console and I can publish simple string messages to it with a local EJB in DEV00 (The console report 7 messages in it today).

       

      My elements on DEV01 :

       

      server/dev01/deploy/hornetq/jms-ds.xml :

      <connection-factories>

         <mbean code="org.jboss.jms.jndi.JMSProviderLoader" name="jboss.mq:service=JMSProviderLoader,name=RemoteJMSProvider,server=rt-vm205.gfisoblagnac">

          <attribute name="ProviderName">RemoteJMSProvider</attribute>

          <attribute name="ProviderAdapterClass">org.jboss.jms.jndi.JNDIProviderAdapter</attribute>

          <attribute name="FactoryRef">java:/XAConnectionFactory</attribute>

          <attribute name="QueueFactoryRef">java:/XAConnectionFactory</attribute>

          <attribute name="TopicFactoryRef">java:/XAConnectionFactory</attribute>

          <attribute name="Properties">

             java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory

             java.naming.factory.url.pkgs=org.jnp.interfaces

             java.naming.provider.url=rt-vm205.gfisoblagnac:1099

          </attribute>

        </mbean>

      ...

       

      My EJB (ToLower.java) :

      package monpkg.tests;

       

      import javax.ejb.ActivationConfigProperty;

      import javax.ejb.MessageDriven;

      import javax.jms.Message;

      import javax.jms.MessageListener;

       

      @MessageDriven(activationConfig =

      {

              @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),

              @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),

              @ActivationConfigProperty(propertyName = "destination", propertyValue = "/queue/TESTSOUT"),

              @ActivationConfigProperty(propertyName = "providerAdapterJNDI", propertyValue = "java:/RemoteJMSProvider") })

      public class ToLower implements MessageListener

      {

          public ToLower()

          {

          }

       

          public void onMessage(Message message)

          {

              System.out.println("ToLower get message : " + message.toString());

          }

      }

       

      We have no error in server/dev01/log/boot.log.

       

      When I publish again my EJB I have this :

      ...

      2012-02-08 15:41:36,926 INFO  [org.jboss.ejb3.EJBContainer] (HDScanner) STOPPED EJB: monpkg.tests.ToLower ejbName: ToLower

      2012-02-08 15:41:36,961 WARN  [org.jboss.ejb3.interceptor.InterceptorInfoRepository] (HDScanner) EJBTHREE-1852: InterceptorInfoRepository is deprecated

      2012-02-08 15:41:36,970 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner) Created KernelDeployment for: EJBTestsToLower.jar

      2012-02-08 15:41:36,970 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner) installing bean: jboss.j2ee:jar=EJBTestsToLower.jar,name=ToLower,service=EJB3

      2012-02-08 15:41:36,970 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)   with dependencies:

      2012-02-08 15:41:36,970 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)   and demands:

      2012-02-08 15:41:36,970 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)      jboss.ejb:service=EJBTimerService; Required: Described

      2012-02-08 15:41:36,970 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)      jboss-switchboard:appName=EJBTestsToLower,module=EJBTestsToLower,name=ToLower; Required: Create

      2012-02-08 15:41:36,971 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)   and supplies:

      2012-02-08 15:41:36,971 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)      jndi:null

      2012-02-08 15:41:36,971 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner)      Class:javax.jms.MessageListener

      2012-02-08 15:41:36,971 INFO  [org.jboss.ejb3.deployers.JBossASKernel] (HDScanner) Added bean(jboss.j2ee:jar=EJBTestsToLower.jar,name=ToLower,service=EJB3) to KernelDeployment of: EJBTestsToLower.jar

      2012-02-08 15:41:36,985 INFO  [org.jboss.ejb3.EJBContainer] (HDScanner) STARTED EJB: monpkg.tests.ToLower ejbName: ToLower

      2012-02-08 15:41:36,988 WARN  [org.jboss.ejb3.TimerServiceContainer] (HDScanner) EJBTHREE-2193: using deprecated TimerServiceFactory for restoring timers

      2012-02-08 15:41:36,992 INFO  [org.hornetq.ra.inflow.HornetQActivation] (pool-1-thread-2) awaiting topic/queue creation /queue/TESTSOUT

      2012-02-08 15:41:38,994 INFO  [org.hornetq.ra.inflow.HornetQActivation] (pool-1-thread-2) Attempting to reconnect org.hornetq.ra.inflow.HornetQActivationSpec(ra=org.hornetq.ra.HornetQResourceAdapter@2db118c5 destination=/queue/TESTSOUT destinationType=javax.jms.Queue ack=Auto-acknowledge durable=false clientID=null user=null maxSession=15)

      2012-02-08 15:41:39,000 INFO  [org.hornetq.ra.inflow.HornetQActivation] (pool-1-thread-2) awaiting topic/queue creation /queue/TESTSOUT

      2012-02-08 15:41:41,003 INFO  [org.hornetq.ra.inflow.HornetQActivation] (pool-1-thread-2) Attempting to reconnect org.hornetq.ra.inflow.HornetQActivationSpec(ra=org.hornetq.ra.HornetQResourceAdapter@2db118c5 destination=/queue/TESTSOUT destinationType=javax.jms.Queue ack=Auto-acknowledge durable=false clientID=null user=null maxSession=15)

      ...

       

      No error but no message is consumed and the EJB seems to wait for the "creation" of the queue...

       

      Anyone can help ?

        • 1. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
          ataylor

          the queue is always looked up locally, so you all you need to do is define it locally.

           

          alternatively you can configure remote jndi on the MDB but this im not sure how this is done, its not really a hornetq question

          • 2. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
            philcero

            Hi Andy,

             

            do you suggest that [Server DEV00 / EJB Publisher] has to write messages to [Server DEV01 / QUEUE Incoming] which can only be listen by [Server DEV01 / EJB Consumer] ?

            • 3. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
              ataylor

              Im not really sure what you are asking here, however, the messages will get sent to which ever host it is configured to connect to,all i am saying is where it looks up the queue makes no difference, you can either define it locally or setup remopte jndi (which i dont know how to do)

              • 4. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
                jbertram

                It looks to me like you're trying to consume a JMS message from a HornetQ provider with the generic JMS JCA RA shipped with JBoss AS 6.  If you want to consume messages from HornetQ with an MDB then the MDB needs to use the HornetQ JCA RA. 

                 

                Even if that wasn't the case, your JMSProviderLoader isn't configured correctly since you're using "java:/" on all the *FactoryRef attributes which limits the JNDI lookup to the local VM.

                • 5. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
                  jbertram

                  To be clear, this is not suitable for the HornetQ JCA RA:

                   

                  @ActivationConfigProperty(propertyName = "providerAdapterJNDI", propertyValue = "java:/RemoteJMSProvider")

                   

                  To configure an MDB which is using the HornetQ JCA RA to consume from a remote destination you should use something like this:

                   

                     @MessageDriven(activationConfig = {

                          @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),

                          @ActivationConfigProperty(propertyName = "destination", propertyValue = "testQueue"),

                          @ActivationConfigProperty(propertyName = "connectorClassName", propertyValue = "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"),

                          @ActivationConfigProperty(propertyName = "connectionParameters", propertyValue = "host=192.168.1.137;port=5445")})

                  • 6. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
                    ataylor

                    theres actually a remote jca example with the distribution, have you looked at that?

                    • 7. Re: Can a EJB3 listen and consume messages from a remote queue (JB 6.1) ?
                      philcero

                      Thanks guys,

                       

                      My server.log :

                      ...

                      2012-02-08 16:33:08,362 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:68ebdc00-5229-11e1-81aa-000c290c30ca]:PERSISTENT

                      2012-02-08 16:33:08,389 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:0a7226a4-522b-11e1-81aa-000c290c30ca]:PERSISTENT

                      2012-02-08 16:33:08,391 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:bdf8bfd8-522c-11e1-81aa-000c290c30ca]:PERSISTENT

                      2012-02-08 16:33:08,393 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:4373d26c-522f-11e1-81aa-000c290c30ca]:PERSISTENT

                      2012-02-08 16:33:08,396 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:47dc2e40-5232-11e1-81aa-000c290c30ca]:PERSISTENT

                      2012-02-08 16:33:08,399 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:f2657604-5237-11e1-81aa-000c290c30ca]:PERSISTENT

                      2012-02-08 16:33:08,401 INFO  [STDOUT] (Thread-3 (group:HornetQ-client-global-threads-793741660)) ToLower get message : HornetQMessage[ID:5a7b5155-525e-11e1-bcf2-000c290c30ca]:PERSISTENT

                      ...

                       

                       

                       

                      If someone need :

                      1. Nothing is needed in file server/dev01/deploy/hornetq/jms-ds.xml
                      2. Here is the "new" DEV01 / EJB consumer code :

                      package monpkg.tests;

                       

                      import javax.ejb.ActivationConfigProperty;

                      import javax.ejb.MessageDriven;

                      import javax.jms.Message;

                      import javax.jms.MessageListener;

                       

                      @MessageDriven(activationConfig =

                      {

                              @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),

                              @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/TESTSOUT"),

                              @ActivationConfigProperty(propertyName = "connectorClassName", propertyValue = "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"),

                              @ActivationConfigProperty(propertyName = "connectionParameters", propertyValue = "host=192.168.2.205;port=5445") })

                      public class ToLower implements MessageListener

                      {

                          public ToLower()

                          {

                          }

                       

                          public void onMessage(Message message)

                          {

                              System.out.println("ToLower get message : " + message.toString());

                          }

                      }