JMS Queue/Topic Issue (JBoss SOA 4.3 - JBoss AS 4.2.2)
jbossuseruseruser Jun 22, 2010 7:04 AMI have tried this with both a Queue and Topic. Within the destinations-service.xml file, I have configured a new Queue or Topic, simply by copying and pasting the example one and changing the name (testTopic > newTopic). I have a class that sends simple text messages to the topic and then another class that receieves them. The receiving class is used within a very simple GUI to get the message and display it in a text area. This all works 'fine'.
The problem is after a certain amount of time (there is no regularity in the time this happens...) messages stop being received. Whilst using a topic I have seen the subscription in the JMX-Console and noticed that it is no longer there when messages are not being received. I have tried leaving sessions open - I've tried closing them and then making a new subscription every time - but no matter what I try it never seems to 'stay alive'. I then turned to Queues, thinking that the same problem couldn't possibly apply, but I was wrong, my GUI client stops receiving messages after an arbitrary amount of time. So my question is: Does JBoss have configuration settings for a subscription? Or something along those lines. Or is it something that I am doing wrong:
destinations-service.xml
<mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=newTopic" xmbean-dd="xmdesc/Topic-xmbean.xml"> <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends> <depends>jboss.messaging:service=PostOffice</depends> <attribute name="SecurityConfig"> <security> <role name="guest" read="true" write="true"/> <role name="publisher" read="true" write="true" create="true"/> </security> </attribute> </mbean>
TopicReceiver.java:
private static final String JBOSS_TOPIC_NAME = "topic/newTopic"; private static final String JBOSS_JNDI_CONNECTION_FACTORY = "ConnectionFactory"; private static final String JBOSS_JNDI_PACKAGE = "org.jboss.naming:org.jnp.interfaces"; private static final String JBOSS_JNDI_DRIVER = "org.jnp.interfaces.NamingContextFactory"; private static final String JBOSS_URL = "jnp://localhost:1099"; //The topic to which we will be publishing messages private Topic testTopic; //The factory that will build us JMS connections private TopicConnectionFactory topicFactory; //The connection to the JMS server private TopicConnection connection; //The subscriber object private TopicSubscriber subscriber; //The session for receiving private TopicSession subscribeSession; private String messageText = ""; /** * Calls the incomingMessage() method on instantiation * to get the message from the subscribed Topic. */ public TopicReceiver() { this.setup(); } /** * Gets the message that has been stored from the subscribed * topic. * @return The newest incoming message. */ public String getMessage(){ return messageText; } /** * Sets up the connection to JBoss and the Subscriber, who will * listen for incoming messages. */ public void setup(){ Properties prop = new Properties(); prop.put("java.naming.factory.initial",JBOSS_JNDI_DRIVER); prop.put("java.naming.provider.url",JBOSS_URL); prop.put("java.naming.factory.url.pkgs",JBOSS_JNDI_PACKAGE); InitialContext context; try { context = new InitialContext(prop); //Lookup the JMS connection factory and topic topicFactory = (TopicConnectionFactory)context.lookup(JBOSS_JNDI_CONNECTION_FACTORY); connection = topicFactory.createTopicConnection(); testTopic = (Topic)context.lookup(JBOSS_TOPIC_NAME); subscribeSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); subscriber = subscribeSession.createSubscriber(testTopic); connection.start(); } catch (NamingException e) { e.printStackTrace(); } catch (JMSException jms){ jms.printStackTrace(); } } /** * Provides a user friendly view of the message - as a String. * @return The Message contained within the JMS Message object. */ public String receiveMessage(){ String msg = ""; try { Message message = subscriber.receive(); msg = ((TextMessage)message).getText(); System.out.println(msg); //stop(); } catch (JMSException e) { e.printStackTrace(); } return msg; } public void stop(){ try { subscribeSession.close(); subscriber.close(); connection.stop(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } }
TopicSender.java:
private static final String JBOSS_TOPIC_NAME = "topic/newTopic"; private static final String JBOSS_JNDI_CONNECTION_FACTORY = "ConnectionFactory"; private static final String JBOSS_JNDI_PACKAGE = "org.jboss.naming:org.jnp.interfaces"; private static final String JBOSS_JNDI_DRIVER = "org.jnp.interfaces.NamingContextFactory"; private static final String JBOSS_URL = "jnp://localhost:1099"; //The topic to which we will be publishing messages private Topic jbossTopic; //The factory that will build us JMS connections private TopicConnectionFactory topicFactory; //The connection to the JMS server private TopicConnection connection; //The publisher object private TopicPublisher publisher; //The session for sending private TopicSession publishSession; /** * Default Constructor */ public TopicSender(){ } /** * Prepares the connection to the Topic on the Server. */ public void setup(){ try{ Properties prop = new Properties(); prop.put("java.naming.factory.initial",JBOSS_JNDI_DRIVER); prop.put("java.naming.provider.url",JBOSS_URL); prop.put("java.naming.factory.url.pkgs",JBOSS_JNDI_PACKAGE); InitialContext context = new InitialContext(prop); //Lookup the JMS connection factory and topic topicFactory = (TopicConnectionFactory)context.lookup(JBOSS_JNDI_CONNECTION_FACTORY); connection = topicFactory.createTopicConnection(); jbossTopic = (Topic)context.lookup(JBOSS_TOPIC_NAME); //to send messages we create a session and a publisher publishSession = connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); connection.start(); } catch(Exception e){ e.printStackTrace(); } } /** * Sends the message passed through the parameter to a topic named * 'topic/testTopic' * @param text - The message that is to be published to the topic. */ public void sendMessage(String text){ try { this.setup(); publisher = publishSession.createPublisher(jbossTopic); TextMessage tm = publishSession.createTextMessage(text); publisher.publish(tm); publisher.close(); this.stop(); } catch (Exception e) { e.printStackTrace(); } } /** * Closes the connection and resources in use. */ public void stop(){ try { connection.stop(); publishSession.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } }
Please let me know if you require more information - any feedback is appreciated,
Thanks,
J