-
1. Re: JBoss Messaging - Redelivery test
niketuec May 3, 2006 7:35 PM (in response to niketuec)Following is the code for Sender and REceiver class:
Sender
public class JMSPageMatchSender {
// Queue connection factory instance
private QueueConnectionFactory factory_;
// JNDI context
private Context ctx_;
// Singleton instance
private static JMSPageMatchSender jmsPageMatchSender_ = new JMSPageMatchSender();
/**
* Private constructor for the singleton
*/
private JMSPageMatchSender() {
try {
Hashtable<String, String> env = new Hashtable<String, String>(5);
env.put(Context.INITIAL_CONTEXT_FACTORY, JMSConfigConstants.INITIAL_CONTEXT_FACTORY);
env.put(Context.URL_PKG_PREFIXES, JMSConfigConstants.URL_PKG_PREFIXES);
env.put(Context.PROVIDER_URL, JMSConfigConstants.PROVIDER_URL);
ctx_ = new InitialContext(env);
factory_ = (QueueConnectionFactory) ctx_.lookup(JMSConfigConstants.CONNECTION_FACTORY);
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Singleton access method to get an instance of JMSPageMatchSender.
* to send messages to a queue
*/
public static JMSPageMatchSender getInstance() {
return jmsPageMatchSender_;
}
/**
* Sends the message to the queue.
*/
public void sendForPageMatch(SitePage sitePage) {
try {
System.out.println("Sending... request to page match for page id:" + sitePage.getPageId());
// Get a connection to the queue
QueueConnection connection = factory_.createQueueConnection();
// Start the connection
connection.start();
// Intiialize a session
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// Look up the queue in which we are interested
Queue queue = (Queue) ctx_.lookup(JMSConfigConstants.QUEUE_DESTINATION);
// Create a message producer for this queue to send the message
QueueSender sender = session.createSender(queue);
// Create a ObjectMessage to send an object
ObjectMessage msg = session.createObjectMessage();
// Set the content in the message
msg.setObject(sitePage);
msg.setLongProperty("JMS_JBOSS_REDELIVERY_DELAY", new Long(5000L));
msg.setIntProperty("JMS_JBOSS_REDELIVERY_LIMIT", new Integer(10));
// Send the message
sender.send(msg);
// Close sender and session
sender.close();
session.close();
System.out.println("Request for page match for page id:" + sitePage.getPageId() + " is in Queue now.");
} catch (Exception ex) {
ex.printStackTrace();
}
}
public static void main(String[] args){
JMSPageMatchSender sender = new JMSPageMatchSender();
sender.sendForPageMatch(new SitePage(1, 2, "http://www.experts-exchange.com", null, 1, 1));
}
}
Receiver
public class JMSPageMatchReceiver implements MessageListener {
public JMSPageMatchReceiver() {
try {
Hashtable<String, String> env = new Hashtable<String, String>(5);
env.put(Context.INITIAL_CONTEXT_FACTORY, JMSConfigConstants.INITIAL_CONTEXT_FACTORY);
env.put(Context.URL_PKG_PREFIXES, JMSConfigConstants.URL_PKG_PREFIXES);
env.put(Context.PROVIDER_URL, JMSConfigConstants.PROVIDER_URL);
Context ctx = new InitialContext(env);
QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(JMSConfigConstants.CONNECTION_FACTORY);
QueueConnection qcon = qconFactory.createQueueConnection();
QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) ctx.lookup(JMSConfigConstants.QUEUE_DESTINATION);
QueueReceiver queueReceiver = qsession.createReceiver(queue);
queueReceiver.setMessageListener(this);
qcon.start();
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void onMessage(Message message) {
try {
SitePage sitePage = ((SitePage)((ObjectMessage) message).getObject());
System.out.println("Processing Request to match page with page id:" + sitePage.getPageId());
if(sitePage.getPageId() == 1){
throw new JMSException("test");
}
SiteManager siteManager = new SiteManager();
Site site = siteManager.getSite(sitePage.getSiteId());
SitePageManager sitePageManager = site.getSitePageManager();
Domain domain = DomainManager.getDomainManager().getDomain(DomainConstants.DEFAULT_DOMAIN_NAME);
if (domain == null) {
throw new RuntimeException("Domain " + DomainConstants.DEFAULT_DOMAIN_NAME + " not found");
}
TopicManager topicManager = new TopicManagerFactory().getTopicManager(domain,
TopicPersistenceMode.DB, TopicPersistenceMode.DB);
PageMatchRunner pageMatchRunner = new PageMatchRunner(sitePageManager, topicManager);
SitePage[] sitePages = new SitePage[1];
sitePages[0] = sitePage;
pageMatchRunner.matchPages(sitePages);
} catch (TopicException e) {
e.printStackTrace();
}catch (Exception e) {
throw new RuntimeException();
}
}
public static void main(String[] args) throws Exception {
JMSPageMatchReceiver receiver = new JMSPageMatchReceiver();
System.out.println("Started listening to asynchronous page match requests......");
}
}