1 Reply Latest reply on Apr 6, 2017 9:56 AM by bjoern_n

    Receiving JMS messages with all MDBs of a wildfly 10 cluster using a topic

    bjoern_n Newbie

      Hi,

      I built a cluster with two wildfly 10 instances (will be more later). Both have the same ear deployed with a JMS message producer and a MDB receiving it. My problem is, that every node only receives its own messages.

       

       

      What I did is:

       

      • Modified the standalone-full-ha.xml

       

        • Set a node name
      <server name="${jboss.node.name}" xmlns="urn:jboss:domain:4.1">
      

       

        • Added a topic
      <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
          <server name="default">
              <cluster password="${jboss.messaging.cluster.password:CHANGE ME!!}"/>
              <security-setting name="#">
                  <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
              </security-setting>
              <address-setting name="#" redistribution-delay="1000" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
              <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
              <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
                  <param name="batch-delay" value="50"/>
              </http-connector>
              <in-vm-connector name="in-vm" server-id="0"/>
              <http-acceptor name="http-acceptor" http-listener="default"/>
              <http-acceptor name="http-acceptor-throughput" http-listener="default">
                  <param name="batch-delay" value="50"/>
                  <param name="direct-deliver" value="false"/>
              </http-acceptor>
              <in-vm-acceptor name="in-vm" server-id="0"/>
              <broadcast-group name="bg-group1" connectors="http-connector" jgroups-channel="activemq-cluster"/>
              <discovery-group name="dg-group1" jgroups-channel="activemq-cluster"/>
              <cluster-connection name="my-cluster" discovery-group="dg-group1" connector-name="http-connector" address="jms"/>
              <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
              <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
              <jms-topic name="MYTopic" entries="java:/jms/topic/MYTopic java:jboss/exported/jms/topic/MYTopic"/>
              <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
              <connection-factory name="RemoteConnectionFactory" reconnect-attempts="-1" block-on-acknowledge="true" ha="true" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
              <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
          </server>
      </subsystem>
      

       

        • Changed the public interface from localhost to my server ip.
      <interfaces>
          <interface name="management">
              <inet-address value="${jboss.bind.address.management:127.0.0.1}"/>
          </interface>
          <interface name="public">
              <inet-address value="${jboss.bind.address}"/>
          </interface>
          <interface name="private">
              <inet-address value="${jboss.bind.address.private:127.0.0.1}"/>
          </interface>
          <interface name="unsecure">
              <inet-address value="${jboss.bind.address.unsecure:127.0.0.1}"/>
          </interface>
      </interfaces>
      

       

        • Changed the jgroups configuration to use the public interface
      <socket-binding-group name="standard-sockets" default-interface="public" port-offset="${jboss.socket.binding.port-offset:0}">
          <socket-binding name="management-http" interface="management" port="${jboss.management.http.port:9990}"/>
          <socket-binding name="management-https" interface="management" port="${jboss.management.https.port:9993}"/>
          <socket-binding name="ajp" port="${jboss.ajp.port:8009}"/>
          <socket-binding name="http" port="${jboss.http.port:8080}"/>
          <socket-binding name="https" port="${jboss.https.port:8443}"/>
          <socket-binding name="iiop" interface="unsecure" port="3528"/>
          <socket-binding name="iiop-ssl" interface="unsecure" port="3529"/>
          <socket-binding name="jgroups-mping" port="0" multicast-address="${jboss.default.multicast.address:230.0.0.4}" multicast-port="45700"/>
          <socket-binding name="jgroups-tcp" port="7600"/>
          <socket-binding name="jgroups-tcp-fd" port="57600"/>
          <socket-binding name="jgroups-udp" port="55200" multicast-address="${jboss.default.multicast.address:230.0.0.4}" multicast-port="45688"/>
          <socket-binding name="jgroups-udp-fd" port="54200"/>
          <socket-binding name="modcluster" port="0" multicast-address="224.0.1.105" multicast-port="23364"/>
          <socket-binding name="txn-recovery-environment" port="4712"/>
          <socket-binding name="txn-status-manager" port="4713"/>
          <outbound-socket-binding name="mail-smtp">
              <remote-destination host="localhost" port="25"/>
          </outbound-socket-binding>
      </socket-binding-group>
      

       

      • Start the servers with

      standalone.bat -Djboss.server.default.config=standalone-full-ha.xml -Djboss.socket.binding.port-offset=10000 -Djgroups.marshalling.compatible=true -Djgroups.bind_addr=<myserver1> -Djboss.bind.address=<myserver1> -Dcluster.name=mycluster  -Djboss.messaging.group.address=230.4.4.3 -Djboss.node.name=node1

       

      and

       

      standalone.bat -Djboss.server.default.config=standalone-full-ha.xml -Djboss.socket.binding.port-offset=10000 -Djgroups.marshalling.compatible=true -Djgroups.bind_addr=<myserver2> -Djboss.bind.address=<myserver2> -Dcluster.name=mycluster -Djboss.messaging.group.address=230.4.4.3 -Djboss.node.name=node2

       

       

      Now a created a small test ear and deployed it in both servers. It contains the following classes:

       

      • ClusterMessage.java, just a small test message class, because i will need a object message later
      package clustertest;
      
      import java.io.Serializable;
      
      /**
       * Message for cluster caches.
       */
      public class ClusterMessage implements Serializable
      {
          private static final long serialVersionUID = 297810413481487172L;
      
          /**
           * Create a message.
           *
           * @param identifier The identifier of the removed dataset.
           */
          public ClusterMessage(String identifier)
          {
              this.identifier = identifier;
          }
      
          /**
           * The identifier of the removed dataset.
           */
          private String identifier;
      
          /**
           * @return The identifier of the removed dataset.
           */
          public String getIdentifier()
          {
              return this.identifier;
          }
      
          @Override
          public String toString()
          {
              return "identifier [" + this.identifier + "]";
          }
      }
      

       

      • ClusterMessengerBean.java, a bean class looking up topic and factory and sending a message every minute.
      package clustertest;
      
      import javax.annotation.Resource;
      import javax.annotation.security.PermitAll;
      import javax.ejb.Schedule;
      import javax.ejb.Stateless;
      import javax.ejb.TransactionAttribute;
      import javax.ejb.TransactionAttributeType;
      import javax.ejb.TransactionManagement;
      import javax.ejb.TransactionManagementType;
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.JMSException;
      import javax.jms.JMSRuntimeException;
      import javax.jms.MessageProducer;
      import javax.jms.ObjectMessage;
      import javax.jms.Session;
      import javax.jms.Topic;
      
      @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
      @Stateless(name = "ClusterMessenger")
      @TransactionManagement(TransactionManagementType.BEAN)
      @PermitAll
      public class ClusterMessengerBean
      {
          @Resource(mappedName = "java:/jms/topic/MYTopic")
          private Topic topic;
      
          @Resource(mappedName = "java:/JmsXA")
          private ConnectionFactory connectionFactory;
      
          private Connection connection = null;
          private Session session = null;
          private MessageProducer messageProducer = null;
      
          @Schedule(minute = "*/1", hour = "*")
          public void send()
          {
              System.out.println("Timer executed.");
      
              try
              {
                  send(new ClusterMessage("myIdentifier"));
      
                  System.out.println("Message sent.");
              }
              catch (Exception e)
              {
                  e.printStackTrace();
              }
          }
      
          public void send(ClusterMessage clusterMessage)
          {
              try
              {
                  if (this.connection == null)
                  {
                      this.connection = this.connectionFactory.createConnection();
                  }
      
                  if (this.session == null)
                  {
                      this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                  }
      
                  if (this.messageProducer == null)
                  {
                      this.messageProducer = this.session.createProducer(this.topic);
                  }
      
                  ObjectMessage message = this.session.createObjectMessage(clusterMessage);
                  message.setStringProperty("node", System.getProperty("jboss.node.name"));
                  this.messageProducer.send(message);
              }
              catch (JMSException | JMSRuntimeException e)
              {
                  e.printStackTrace();
              }
          }
      }
      

       

      • MyMDBean.java, A message driven bean using the remote syntax of the topic to receive the messages.
      package clustertest;
      
      import javax.annotation.security.PermitAll;
      import javax.ejb.ActivationConfigProperty;
      import javax.ejb.MessageDriven;
      import javax.ejb.TransactionAttribute;
      import javax.ejb.TransactionAttributeType;
      import javax.ejb.TransactionManagement;
      import javax.ejb.TransactionManagementType;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      
      @MessageDriven(
          activationConfig = {
              @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
              @ActivationConfigProperty(
                  propertyName = "destination",
                  propertyValue = "java:jboss/exported/jms/topic/MYTopic"),
              @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
              @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1") })
      
      @TransactionAttribute(TransactionAttributeType.REQUIRED)
      @TransactionManagement(TransactionManagementType.BEAN)
      @PermitAll
      public class MyMDBean implements MessageListener
      {
          @Override
          public void onMessage(Message message)
          {
              try
              {
                  System.out.println(
                      "Received message from server [" + message.getStringProperty("node") + "]:" + message);
              }
              catch (JMSException e)
              {
                  e.printStackTrace();
              }
          }
      }
      

       

      If both servers are running and formed a cluster i get the following logs in node1:

       

      2017-04-06 08:27:00,007 INFO  [stdout] (EJB default - 3) Timer executed.

      2017-04-06 08:27:00,009 INFO  [stdout] (EJB default - 3) Message sent.

      2017-04-06 08:27:00,012 INFO  [stdout] (Thread-2 (ActiveMQ-client-global-threads-125360358)) Received message from server [node1]:ActiveMQMessage[ID:0a65a3b1-1a92-11e7-87ab-975aaba56fad]:PERSISTENT/ClientMessageImpl[messageID=6442450958, durable=true, address=jms.topic.MYTopic,userID=0a65a3b1-1a92-11e7-87ab-975aaba56fad,properties=TypedProperties[__AMQ_CID=cda21118-1a91-11e7-87ab-975aaba56fad,node=node1]]

       

      and on node 2

       

      2017-04-06 08:27:00,007 INFO  [stdout] (EJB default - 3) Timer executed.

      2017-04-06 08:27:00,009 INFO  [stdout] (EJB default - 3) Message sent.

      2017-04-06 08:27:00,012 INFO  [stdout] (Thread-2 (ActiveMQ-client-global-threads-125360358)) Received message from server [node2]:ActiveMQMessage[ID:0a65a3b1-1a92-11e7-87ab-975aaba56fad]:PERSISTENT/ClientMessageImpl[messageID=6442450958, durable=true, address=jms.topic.MYTopic,userID=0a65a3b1-1a92-11e7-87ab-975aaba56fad,properties=TypedProperties[__AMQ_CID=cda21118-1a91-11e7-87ab-975aaba56fad,node=node2]]

       

      What needs to be configured to receive the messages with all MDBs of all servers? I would be thankful for every hint.

       

      One additional issue:

       

      I get constantly messages like the following in the log:

       

      2017-04-06 07:39:07,566 WARNING [io.netty.channel.DefaultChannelPipeline] (default I/O-2) An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: java.io.IOException: Eine vorhandene Verbindung wurde vom Remotehost geschlossen

          at sun.nio.ch.SocketDispatcher.read0(Native Method)

          at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)

          at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

          at sun.nio.ch.IOUtil.read(IOUtil.java:192)

          at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

          at org.xnio.nio.NioSocketConduit.read(NioSocketConduit.java:289)

          at org.xnio.conduits.ConduitStreamSourceChannel.read(ConduitStreamSourceChannel.java:127)

          at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:357)

          at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:898)

          at org.xnio.netty.transport.AbstractXnioSocketChannel$ReadListener.handleEvent(AbstractXnioSocketChannel.java:428)

          at org.xnio.netty.transport.AbstractXnioSocketChannel$ReadListener.handleEvent(AbstractXnioSocketChannel.java:371)

          at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)

          at org.xnio.conduits.ReadReadyHandler$ChannelListenerHandler.readReady(ReadReadyHandler.java:66)

          at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:89)

          at org.xnio.nio.WorkerThread.run(WorkerThread.java:567)

       

      This stops happening if I switch the http-connector to https in standalone-full-ha.xml:

       

      <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="https"/>
      <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="https">
          <param name="batch-delay" value="50"/>
      </http-connector>
      

       

      May that have something to to with it?

       

      What I already tried is advising the activemq-ra factory to use the http connector, which lead to the message, that the factory could not be connected. I also tried a lot of annotations, factories and different JNDI-syntax after reading posts here and elsewhere, but nothing worked, which is why I don't complicate my example by adding something of this.

       

      Thanks in advance,

      Björn

        • 1. Re: Receiving JMS messages with all MDBs of a wildfly 10 cluster using a topic
          bjoern_n Newbie

          After searching for a week I created this post and, like always I found the solution after that.

           

          Setting message-load-balancing-type to STRICT does the trick.

           

               <subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">

                      <server name="default">

                          <cluster password="${jboss.messaging.cluster.password:cursor}" user="${jboss.messaging.cluster.user:TECH_USER}"/>

                          <security-setting name="#">

                              <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>

                          </security-setting>

                          <address-setting name="#" redistribution-delay="0" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>

                          <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="https">

                              <param name="ssl-enabled" value="true"/>

                          </http-connector>

                          <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="https">

                              <param name="ssl-enabled" value="true"/>

                              <param name="batch-delay" value="50"/>

                          </http-connector>

                          <in-vm-connector name="in-vm" server-id="0"/>

                          <http-acceptor name="http-acceptor" http-listener="default-https"/>

                          <http-acceptor name="http-acceptor-throughput" http-listener="default-https">

                              <param name="batch-delay" value="50"/>

                              <param name="direct-deliver" value="false"/>

                          </http-acceptor>

                          <in-vm-acceptor name="in-vm" server-id="0"/>

                          <broadcast-group name="bg-group1" connectors="http-connector" jgroups-channel="activemq-cluster"/>

                          <discovery-group name="dg-group1" jgroups-channel="activemq-cluster" jgroups-stack="udp"/>

                          <cluster-connection name="my-cluster" discovery-group="dg-group1" message-load-balancing-type="STRICT" connector-name="http-connector" address="jms"/>

                          <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>

                          <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>

                          <jms-topic name="MYTopic" entries="java:/jms/topic/MYTopic java:jboss/exported/jms/topic/MYTopic"/>

                          <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>

                          <connection-factory name="RemoteConnectionFactory" reconnect-attempts="-1" block-on-acknowledge="true" ha="true" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>

                          <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>

                      </server>

                  </subsystem>