Receiving JMS messages with all MDBs of a wildfly 10 cluster using a topic
bjoern_n Apr 6, 2017 2:53 AMHi,
I built a cluster with two wildfly 10 instances (will be more later). Both have the same ear deployed with a JMS message producer and a MDB receiving it. My problem is, that every node only receives its own messages.
What I did is:
- Modified the standalone-full-ha.xml
- Set a node name
<server name="${jboss.node.name}" xmlns="urn:jboss:domain:4.1">
- Added a topic
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0"> <server name="default"> <cluster password="${jboss.messaging.cluster.password:CHANGE ME!!}"/> <security-setting name="#"> <role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/> </security-setting> <address-setting name="#" redistribution-delay="1000" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/> <http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/> <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http"> <param name="batch-delay" value="50"/> </http-connector> <in-vm-connector name="in-vm" server-id="0"/> <http-acceptor name="http-acceptor" http-listener="default"/> <http-acceptor name="http-acceptor-throughput" http-listener="default"> <param name="batch-delay" value="50"/> <param name="direct-deliver" value="false"/> </http-acceptor> <in-vm-acceptor name="in-vm" server-id="0"/> <broadcast-group name="bg-group1" connectors="http-connector" jgroups-channel="activemq-cluster"/> <discovery-group name="dg-group1" jgroups-channel="activemq-cluster"/> <cluster-connection name="my-cluster" discovery-group="dg-group1" connector-name="http-connector" address="jms"/> <jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/> <jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/> <jms-topic name="MYTopic" entries="java:/jms/topic/MYTopic java:jboss/exported/jms/topic/MYTopic"/> <connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/> <connection-factory name="RemoteConnectionFactory" reconnect-attempts="-1" block-on-acknowledge="true" ha="true" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/> <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/> </server> </subsystem>
- Changed the public interface from localhost to my server ip.
<interfaces> <interface name="management"> <inet-address value="${jboss.bind.address.management:127.0.0.1}"/> </interface> <interface name="public"> <inet-address value="${jboss.bind.address}"/> </interface> <interface name="private"> <inet-address value="${jboss.bind.address.private:127.0.0.1}"/> </interface> <interface name="unsecure"> <inet-address value="${jboss.bind.address.unsecure:127.0.0.1}"/> </interface> </interfaces>
- Changed the jgroups configuration to use the public interface
<socket-binding-group name="standard-sockets" default-interface="public" port-offset="${jboss.socket.binding.port-offset:0}"> <socket-binding name="management-http" interface="management" port="${jboss.management.http.port:9990}"/> <socket-binding name="management-https" interface="management" port="${jboss.management.https.port:9993}"/> <socket-binding name="ajp" port="${jboss.ajp.port:8009}"/> <socket-binding name="http" port="${jboss.http.port:8080}"/> <socket-binding name="https" port="${jboss.https.port:8443}"/> <socket-binding name="iiop" interface="unsecure" port="3528"/> <socket-binding name="iiop-ssl" interface="unsecure" port="3529"/> <socket-binding name="jgroups-mping" port="0" multicast-address="${jboss.default.multicast.address:230.0.0.4}" multicast-port="45700"/> <socket-binding name="jgroups-tcp" port="7600"/> <socket-binding name="jgroups-tcp-fd" port="57600"/> <socket-binding name="jgroups-udp" port="55200" multicast-address="${jboss.default.multicast.address:230.0.0.4}" multicast-port="45688"/> <socket-binding name="jgroups-udp-fd" port="54200"/> <socket-binding name="modcluster" port="0" multicast-address="224.0.1.105" multicast-port="23364"/> <socket-binding name="txn-recovery-environment" port="4712"/> <socket-binding name="txn-status-manager" port="4713"/> <outbound-socket-binding name="mail-smtp"> <remote-destination host="localhost" port="25"/> </outbound-socket-binding> </socket-binding-group>
- Start the servers with
standalone.bat -Djboss.server.default.config=standalone-full-ha.xml -Djboss.socket.binding.port-offset=10000 -Djgroups.marshalling.compatible=true -Djgroups.bind_addr=<myserver1> -Djboss.bind.address=<myserver1> -Dcluster.name=mycluster -Djboss.messaging.group.address=230.4.4.3 -Djboss.node.name=node1
and
standalone.bat -Djboss.server.default.config=standalone-full-ha.xml -Djboss.socket.binding.port-offset=10000 -Djgroups.marshalling.compatible=true -Djgroups.bind_addr=<myserver2> -Djboss.bind.address=<myserver2> -Dcluster.name=mycluster -Djboss.messaging.group.address=230.4.4.3 -Djboss.node.name=node2
Now a created a small test ear and deployed it in both servers. It contains the following classes:
- ClusterMessage.java, just a small test message class, because i will need a object message later
package clustertest; import java.io.Serializable; /** * Message for cluster caches. */ public class ClusterMessage implements Serializable { private static final long serialVersionUID = 297810413481487172L; /** * Create a message. * * @param identifier The identifier of the removed dataset. */ public ClusterMessage(String identifier) { this.identifier = identifier; } /** * The identifier of the removed dataset. */ private String identifier; /** * @return The identifier of the removed dataset. */ public String getIdentifier() { return this.identifier; } @Override public String toString() { return "identifier [" + this.identifier + "]"; } }
- ClusterMessengerBean.java, a bean class looking up topic and factory and sending a message every minute.
package clustertest; import javax.annotation.Resource; import javax.annotation.security.PermitAll; import javax.ejb.Schedule; import javax.ejb.Stateless; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.JMSRuntimeException; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.Topic; @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) @Stateless(name = "ClusterMessenger") @TransactionManagement(TransactionManagementType.BEAN) @PermitAll public class ClusterMessengerBean { @Resource(mappedName = "java:/jms/topic/MYTopic") private Topic topic; @Resource(mappedName = "java:/JmsXA") private ConnectionFactory connectionFactory; private Connection connection = null; private Session session = null; private MessageProducer messageProducer = null; @Schedule(minute = "*/1", hour = "*") public void send() { System.out.println("Timer executed."); try { send(new ClusterMessage("myIdentifier")); System.out.println("Message sent."); } catch (Exception e) { e.printStackTrace(); } } public void send(ClusterMessage clusterMessage) { try { if (this.connection == null) { this.connection = this.connectionFactory.createConnection(); } if (this.session == null) { this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } if (this.messageProducer == null) { this.messageProducer = this.session.createProducer(this.topic); } ObjectMessage message = this.session.createObjectMessage(clusterMessage); message.setStringProperty("node", System.getProperty("jboss.node.name")); this.messageProducer.send(message); } catch (JMSException | JMSRuntimeException e) { e.printStackTrace(); } } }
- MyMDBean.java, A message driven bean using the remote syntax of the topic to receive the messages.
package clustertest;
import javax.annotation.security.PermitAll; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.ejb.TransactionAttribute; import javax.ejb.TransactionAttributeType; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; @MessageDriven( activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"), @ActivationConfigProperty( propertyName = "destination", propertyValue = "java:jboss/exported/jms/topic/MYTopic"), @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1") }) @TransactionAttribute(TransactionAttributeType.REQUIRED) @TransactionManagement(TransactionManagementType.BEAN) @PermitAll public class MyMDBean implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println( "Received message from server [" + message.getStringProperty("node") + "]:" + message); } catch (JMSException e) { e.printStackTrace(); } } }
If both servers are running and formed a cluster i get the following logs in node1:
2017-04-06 08:27:00,007 INFO [stdout] (EJB default - 3) Timer executed.
2017-04-06 08:27:00,009 INFO [stdout] (EJB default - 3) Message sent.
2017-04-06 08:27:00,012 INFO [stdout] (Thread-2 (ActiveMQ-client-global-threads-125360358)) Received message from server [node1]:ActiveMQMessage[ID:0a65a3b1-1a92-11e7-87ab-975aaba56fad]:PERSISTENT/ClientMessageImpl[messageID=6442450958, durable=true, address=jms.topic.MYTopic,userID=0a65a3b1-1a92-11e7-87ab-975aaba56fad,properties=TypedProperties[__AMQ_CID=cda21118-1a91-11e7-87ab-975aaba56fad,node=node1]]
and on node 2
2017-04-06 08:27:00,007 INFO [stdout] (EJB default - 3) Timer executed.
2017-04-06 08:27:00,009 INFO [stdout] (EJB default - 3) Message sent.
2017-04-06 08:27:00,012 INFO [stdout] (Thread-2 (ActiveMQ-client-global-threads-125360358)) Received message from server [node2]:ActiveMQMessage[ID:0a65a3b1-1a92-11e7-87ab-975aaba56fad]:PERSISTENT/ClientMessageImpl[messageID=6442450958, durable=true, address=jms.topic.MYTopic,userID=0a65a3b1-1a92-11e7-87ab-975aaba56fad,properties=TypedProperties[__AMQ_CID=cda21118-1a91-11e7-87ab-975aaba56fad,node=node2]]
What needs to be configured to receive the messages with all MDBs of all servers? I would be thankful for every hint.
One additional issue:
I get constantly messages like the following in the log:
2017-04-06 07:39:07,566 WARNING [io.netty.channel.DefaultChannelPipeline] (default I/O-2) An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: java.io.IOException: Eine vorhandene Verbindung wurde vom Remotehost geschlossen
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.xnio.nio.NioSocketConduit.read(NioSocketConduit.java:289)
at org.xnio.conduits.ConduitStreamSourceChannel.read(ConduitStreamSourceChannel.java:127)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:357)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:898)
at org.xnio.netty.transport.AbstractXnioSocketChannel$ReadListener.handleEvent(AbstractXnioSocketChannel.java:428)
at org.xnio.netty.transport.AbstractXnioSocketChannel$ReadListener.handleEvent(AbstractXnioSocketChannel.java:371)
at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
at org.xnio.conduits.ReadReadyHandler$ChannelListenerHandler.readReady(ReadReadyHandler.java:66)
at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:89)
at org.xnio.nio.WorkerThread.run(WorkerThread.java:567)
This stops happening if I switch the http-connector to https in standalone-full-ha.xml:
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="https"/> <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="https"> <param name="batch-delay" value="50"/> </http-connector>
May that have something to to with it?
What I already tried is advising the activemq-ra factory to use the http connector, which lead to the message, that the factory could not be connected. I also tried a lot of annotations, factories and different JNDI-syntax after reading posts here and elsewhere, but nothing worked, which is why I don't complicate my example by adding something of this.
Thanks in advance,
Björn
-
CLUSTERTEST.ear 4.0 KB