Find the missing configuration to use discovery address to connect server from clients
venkatesha.k.c May 24, 2011 9:02 AMHi
I'm trying to establish connection with the server using discovery address and discovery port.
it's able to connect with the server but not able to publish and recieve messages.
Below i've pasted the configuration files and code.
Please help me ASPA.
Thanks.
HornetQ-Configuration File.
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<clustered>true</clustered>
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
<broadcast-groups>
<broadcast-group name="bg-group1">
<group-address>239.6.7.8</group-address>
<group-port>45400</group-port>
<broadcast-period>5000</broadcast-period>
<connector-ref connector-name="netty"/>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
<discovery-group name="dg-group1">
<group-address>239.6.7.8</group-address>
<group-port>45400</group-port>
<refresh-timeout>10000</refresh-timeout>
</discovery-group>
</discovery-groups>
<cluster-connections>
<cluster-connection name="my-cluster">
<address>jms</address>
<discovery-group-ref discovery-group-name="dg-group1"/>
</cluster-connection>
</cluster-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
HornetQ-JMS Configuration:
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="NettyConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="/ConnectionFactory"/>
<entry name="/XAConnectionFactory"/>
</entries>
</connection-factory>
<connection-factory name="NettyThroughputConnectionFactory">
<connectors>
<connector-ref connector-name="netty-throughput"/>
</connectors>
<entries>
<entry name="/ThroughputConnectionFactory"/>
<entry name="/XAThroughputConnectionFactory"/>
</entries>
</connection-factory>
<queue name="DLQ">
<entry name="/queue/DLQ"/>
</queue>
<queue name="ExpiryQueue">
<entry name="/queue/ExpiryQueue"/>
</queue>
<queue name="ExampleQueue">
<entry name="/queue/ExampleQueue"/>
</queue>
<topic name="ExampleTopic">
<entry name="/topic/ExampleTopic"/>
</topic>
</configuration>
Server :
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
public class HqServer
{
private HornetQServer hqServer;
private JMSServerManager jmsServer;
private FileConfiguration conf;
public HqServer(String conf) throws Exception
{
hqServer = HornetQServers.newHornetQServer(getConf(conf));
}
private Configuration getConf(String confPath) throws Exception
{
conf = new FileConfiguration();
conf.setConfigurationUrl(confPath);
conf.setPersistenceEnabled(true);
conf.setSecurityEnabled(false);
conf.start();
return conf;
}
public void runHqServer() throws Exception
{
hqServer.start();
}
public void runJmsServer(String jmsConfig) throws Exception
{
jmsServer = new JMSServerManagerImpl(hqServer, jmsConfig);
jmsServer.start();
}
public void stop() throws Exception
{
jmsServer.stop();
}
public static void main(String[] args) throws Exception
{
HqServer server = new HqServer("file:/D:/NewWorkSpace/WorkHornetQ/clustered/hornetq-configuration.xml");
server.runJmsServer("file:/D:/NewWorkSpace/WorkHornetQ/clustered/hornetq-jms.xml");
}
}
Producer :
import javax.jms.JMSException;
import org.hornetq.jms.client.HornetQConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQMessageProducer;
import org.hornetq.jms.client.HornetQSession;
/**
* The Class HqProducer.
*/
public class HqProducer
{
public static void main(String[] args) throws Exception
{
HqProducer p = new HqProducer("239.6.7.8", 45400);
p.createConnection();
p.createProducer();
p.sendMessage("hi");
p.keepSending();
}
private HornetQConnectionFactory hqConnectionFactory;
private HornetQConnection hqConnection;
private HornetQSession hqSession;
private HornetQMessageProducer hqProducer;
public HqProducer(String discoveryAddress, int discoveryPort)
{
hqConnectionFactory = new HornetQConnectionFactory(discoveryAddress, discoveryPort);
hqConnectionFactory.setDiscoveryInitialWaitTimeout(10000);
System.out.println("Connection Factory : " + hqConnectionFactory.getDiscoveryAddress() + " : " + hqConnectionFactory.getDiscoveryPort());
}
public void createConnection() throws JMSException
{
hqConnection = (HornetQConnection) hqConnectionFactory.createConnection();
System.out.println("Connection : " + hqConnection.toString());
}
public void createProducer() throws JMSException
{
hqSession = (HornetQSession) hqConnection.createSession(true, HornetQSession.TYPE_TOPIC_SESSION);
hqProducer = (HornetQMessageProducer) hqSession.createProducer(HornetQDestination.fromAddress(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + "ExampleTopic"));
System.out.println("Topic spec : " + hqProducer.getTopic());
}
public void sendMessage(String msg) throws JMSException
{
HornetQMessage message = (HornetQMessage) hqSession.createMessage();
message.setStringProperty("key", "Hi....!!" + msg);
hqConnection.start();
hqSession.start();
hqProducer.publish(message);
}
public void keepSending() throws Exception
{
HornetQMessage message = (HornetQMessage) hqSession.createMessage();
message.setStringProperty("key", "Hi....!!" );
for (int i = 0; i < 100000; i++)
{
hqProducer.publish(message);
}
}
public void close() throws Exception
{
hqProducer.close();
hqSession.close();
hqConnection.close();
}
}
Consumer :
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.hornetq.jms.client.HornetQConnection;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQMessageConsumer;
import org.hornetq.jms.client.HornetQSession;
public class HqConsumer implements MessageListener
{
public static void main(String[] args) throws Exception
{
HqConsumer con = new HqConsumer("239.6.7.8", 45400);
con.createConnection();
con.recieve();
}
private HornetQConnectionFactory hqConnectionFactory;
private HornetQConnection hqConnection;
private HornetQSession hqSession;
private HornetQMessageConsumer consumer;
public HqConsumer(String discoveryAddress, int discoveryPort)
{
hqConnectionFactory = new HornetQConnectionFactory(discoveryAddress, discoveryPort);
hqConnectionFactory.setDiscoveryInitialWaitTimeout(10000);
System.out.println("Connection Factory : " + hqConnectionFactory.getDiscoveryAddress() + " : " + hqConnectionFactory.getDiscoveryPort());
}
public void createConnection() throws JMSException
{
hqConnection = (HornetQConnection) hqConnectionFactory.createConnection();
System.out.println("Connection : " + hqConnection.toString());
}
public void recieve() throws JMSException
{
hqSession = (HornetQSession) hqConnection.createSession(true, HornetQSession.TYPE_TOPIC_SESSION);
consumer = (HornetQMessageConsumer) hqSession.createConsumer(HornetQDestination.fromAddress(HornetQDestination.JMS_TOPIC_ADDRESS_PREFIX + "ExampleTopic"));
System.out.println("Topic spec : " + consumer.getTopic() + " " );
consumer.setMessageListener(this);
hqConnection.start();
hqSession.start();
}
public void close() throws Exception
{
consumer.close();
hqSession.close();
hqConnection.close();
}
@Override
public void onMessage(Message message)
{
try
{
System.out.println("Message Recieved : " + message.getStringProperty("key"));
}
catch (JMSException e)
{
e.printStackTrace();
}
}
}