Receiving JMS messages with all MDBs of a wildfly 10 cluster using a topic
bjoern_n Apr 6, 2017 2:53 AMHi,
I built a cluster with two wildfly 10 instances (will be more later). Both have the same ear deployed with a JMS message producer and a MDB receiving it. My problem is, that every node only receives its own messages.
What I did is:
- Modified the standalone-full-ha.xml
- Set a node name
<server name="${jboss.node.name}" xmlns="urn:jboss:domain:4.1">
- Added a topic
<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
<server name="default">
<cluster password="${jboss.messaging.cluster.password:CHANGE ME!!}"/>
<security-setting name="#">
<role name="guest" delete-non-durable-queue="true" create-non-durable-queue="true" consume="true" send="true"/>
</security-setting>
<address-setting name="#" redistribution-delay="1000" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="http"/>
<http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="http">
<param name="batch-delay" value="50"/>
</http-connector>
<in-vm-connector name="in-vm" server-id="0"/>
<http-acceptor name="http-acceptor" http-listener="default"/>
<http-acceptor name="http-acceptor-throughput" http-listener="default">
<param name="batch-delay" value="50"/>
<param name="direct-deliver" value="false"/>
</http-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
<broadcast-group name="bg-group1" connectors="http-connector" jgroups-channel="activemq-cluster"/>
<discovery-group name="dg-group1" jgroups-channel="activemq-cluster"/>
<cluster-connection name="my-cluster" discovery-group="dg-group1" connector-name="http-connector" address="jms"/>
<jms-queue name="ExpiryQueue" entries="java:/jms/queue/ExpiryQueue"/>
<jms-queue name="DLQ" entries="java:/jms/queue/DLQ"/>
<jms-topic name="MYTopic" entries="java:/jms/topic/MYTopic java:jboss/exported/jms/topic/MYTopic"/>
<connection-factory name="InVmConnectionFactory" entries="java:/ConnectionFactory" connectors="in-vm"/>
<connection-factory name="RemoteConnectionFactory" reconnect-attempts="-1" block-on-acknowledge="true" ha="true" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
<pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
</server>
</subsystem>
- Changed the public interface from localhost to my server ip.
<interfaces>
<interface name="management">
<inet-address value="${jboss.bind.address.management:127.0.0.1}"/>
</interface>
<interface name="public">
<inet-address value="${jboss.bind.address}"/>
</interface>
<interface name="private">
<inet-address value="${jboss.bind.address.private:127.0.0.1}"/>
</interface>
<interface name="unsecure">
<inet-address value="${jboss.bind.address.unsecure:127.0.0.1}"/>
</interface>
</interfaces>
- Changed the jgroups configuration to use the public interface
<socket-binding-group name="standard-sockets" default-interface="public" port-offset="${jboss.socket.binding.port-offset:0}">
<socket-binding name="management-http" interface="management" port="${jboss.management.http.port:9990}"/>
<socket-binding name="management-https" interface="management" port="${jboss.management.https.port:9993}"/>
<socket-binding name="ajp" port="${jboss.ajp.port:8009}"/>
<socket-binding name="http" port="${jboss.http.port:8080}"/>
<socket-binding name="https" port="${jboss.https.port:8443}"/>
<socket-binding name="iiop" interface="unsecure" port="3528"/>
<socket-binding name="iiop-ssl" interface="unsecure" port="3529"/>
<socket-binding name="jgroups-mping" port="0" multicast-address="${jboss.default.multicast.address:230.0.0.4}" multicast-port="45700"/>
<socket-binding name="jgroups-tcp" port="7600"/>
<socket-binding name="jgroups-tcp-fd" port="57600"/>
<socket-binding name="jgroups-udp" port="55200" multicast-address="${jboss.default.multicast.address:230.0.0.4}" multicast-port="45688"/>
<socket-binding name="jgroups-udp-fd" port="54200"/>
<socket-binding name="modcluster" port="0" multicast-address="224.0.1.105" multicast-port="23364"/>
<socket-binding name="txn-recovery-environment" port="4712"/>
<socket-binding name="txn-status-manager" port="4713"/>
<outbound-socket-binding name="mail-smtp">
<remote-destination host="localhost" port="25"/>
</outbound-socket-binding>
</socket-binding-group>
- Start the servers with
standalone.bat -Djboss.server.default.config=standalone-full-ha.xml -Djboss.socket.binding.port-offset=10000 -Djgroups.marshalling.compatible=true -Djgroups.bind_addr=<myserver1> -Djboss.bind.address=<myserver1> -Dcluster.name=mycluster -Djboss.messaging.group.address=230.4.4.3 -Djboss.node.name=node1
and
standalone.bat -Djboss.server.default.config=standalone-full-ha.xml -Djboss.socket.binding.port-offset=10000 -Djgroups.marshalling.compatible=true -Djgroups.bind_addr=<myserver2> -Djboss.bind.address=<myserver2> -Dcluster.name=mycluster -Djboss.messaging.group.address=230.4.4.3 -Djboss.node.name=node2
Now a created a small test ear and deployed it in both servers. It contains the following classes:
- ClusterMessage.java, just a small test message class, because i will need a object message later
package clustertest;
import java.io.Serializable;
/**
* Message for cluster caches.
*/
public class ClusterMessage implements Serializable
{
private static final long serialVersionUID = 297810413481487172L;
/**
* Create a message.
*
* @param identifier The identifier of the removed dataset.
*/
public ClusterMessage(String identifier)
{
this.identifier = identifier;
}
/**
* The identifier of the removed dataset.
*/
private String identifier;
/**
* @return The identifier of the removed dataset.
*/
public String getIdentifier()
{
return this.identifier;
}
@Override
public String toString()
{
return "identifier [" + this.identifier + "]";
}
}
- ClusterMessengerBean.java, a bean class looking up topic and factory and sending a message every minute.
package clustertest;
import javax.annotation.Resource;
import javax.annotation.security.PermitAll;
import javax.ejb.Schedule;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
@Stateless(name = "ClusterMessenger")
@TransactionManagement(TransactionManagementType.BEAN)
@PermitAll
public class ClusterMessengerBean
{
@Resource(mappedName = "java:/jms/topic/MYTopic")
private Topic topic;
@Resource(mappedName = "java:/JmsXA")
private ConnectionFactory connectionFactory;
private Connection connection = null;
private Session session = null;
private MessageProducer messageProducer = null;
@Schedule(minute = "*/1", hour = "*")
public void send()
{
System.out.println("Timer executed.");
try
{
send(new ClusterMessage("myIdentifier"));
System.out.println("Message sent.");
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void send(ClusterMessage clusterMessage)
{
try
{
if (this.connection == null)
{
this.connection = this.connectionFactory.createConnection();
}
if (this.session == null)
{
this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
if (this.messageProducer == null)
{
this.messageProducer = this.session.createProducer(this.topic);
}
ObjectMessage message = this.session.createObjectMessage(clusterMessage);
message.setStringProperty("node", System.getProperty("jboss.node.name"));
this.messageProducer.send(message);
}
catch (JMSException | JMSRuntimeException e)
{
e.printStackTrace();
}
}
}
- MyMDBean.java, A message driven bean using the remote syntax of the topic to receive the messages.
package clustertest;
import javax.annotation.security.PermitAll;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@MessageDriven(
activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(
propertyName = "destination",
propertyValue = "java:jboss/exported/jms/topic/MYTopic"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1") })
@TransactionAttribute(TransactionAttributeType.REQUIRED)
@TransactionManagement(TransactionManagementType.BEAN)
@PermitAll
public class MyMDBean implements MessageListener
{
@Override
public void onMessage(Message message)
{
try
{
System.out.println(
"Received message from server [" + message.getStringProperty("node") + "]:" + message);
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}
If both servers are running and formed a cluster i get the following logs in node1:
2017-04-06 08:27:00,007 INFO [stdout] (EJB default - 3) Timer executed.
2017-04-06 08:27:00,009 INFO [stdout] (EJB default - 3) Message sent.
2017-04-06 08:27:00,012 INFO [stdout] (Thread-2 (ActiveMQ-client-global-threads-125360358)) Received message from server [node1]:ActiveMQMessage[ID:0a65a3b1-1a92-11e7-87ab-975aaba56fad]:PERSISTENT/ClientMessageImpl[messageID=6442450958, durable=true, address=jms.topic.MYTopic,userID=0a65a3b1-1a92-11e7-87ab-975aaba56fad,properties=TypedProperties[__AMQ_CID=cda21118-1a91-11e7-87ab-975aaba56fad,node=node1]]
and on node 2
2017-04-06 08:27:00,007 INFO [stdout] (EJB default - 3) Timer executed.
2017-04-06 08:27:00,009 INFO [stdout] (EJB default - 3) Message sent.
2017-04-06 08:27:00,012 INFO [stdout] (Thread-2 (ActiveMQ-client-global-threads-125360358)) Received message from server [node2]:ActiveMQMessage[ID:0a65a3b1-1a92-11e7-87ab-975aaba56fad]:PERSISTENT/ClientMessageImpl[messageID=6442450958, durable=true, address=jms.topic.MYTopic,userID=0a65a3b1-1a92-11e7-87ab-975aaba56fad,properties=TypedProperties[__AMQ_CID=cda21118-1a91-11e7-87ab-975aaba56fad,node=node2]]
What needs to be configured to receive the messages with all MDBs of all servers? I would be thankful for every hint.
One additional issue:
I get constantly messages like the following in the log:
2017-04-06 07:39:07,566 WARNING [io.netty.channel.DefaultChannelPipeline] (default I/O-2) An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.: java.io.IOException: Eine vorhandene Verbindung wurde vom Remotehost geschlossen
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.xnio.nio.NioSocketConduit.read(NioSocketConduit.java:289)
at org.xnio.conduits.ConduitStreamSourceChannel.read(ConduitStreamSourceChannel.java:127)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:357)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:898)
at org.xnio.netty.transport.AbstractXnioSocketChannel$ReadListener.handleEvent(AbstractXnioSocketChannel.java:428)
at org.xnio.netty.transport.AbstractXnioSocketChannel$ReadListener.handleEvent(AbstractXnioSocketChannel.java:371)
at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
at org.xnio.conduits.ReadReadyHandler$ChannelListenerHandler.readReady(ReadReadyHandler.java:66)
at org.xnio.nio.NioSocketConduit.handleReady(NioSocketConduit.java:89)
at org.xnio.nio.WorkerThread.run(WorkerThread.java:567)
This stops happening if I switch the http-connector to https in standalone-full-ha.xml:
<http-connector name="http-connector" endpoint="http-acceptor" socket-binding="https"/> <http-connector name="http-connector-throughput" endpoint="http-acceptor-throughput" socket-binding="https"> <param name="batch-delay" value="50"/> </http-connector>
May that have something to to with it?
What I already tried is advising the activemq-ra factory to use the http connector, which lead to the message, that the factory could not be connected. I also tried a lot of annotations, factories and different JNDI-syntax after reading posts here and elsewhere, but nothing worked, which is why I don't complicate my example by adding something of this.
Thanks in advance,
Björn
-
CLUSTERTEST.ear 4.0 KB