import javax.jms.*; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.MessageConsumer; import javax.jms.Destination; /* * Created on Feb 17, 2011 * * TODO To change the template for this generated file go to * Window - Preferences - Java - Code Style - Code Templates */ /** * @author NNakarikanti * * TODO To change the template for this generated type comment go to * Window - Preferences - Java - Code Style - Code Templates */ public class AMQConsumer extends Thread implements MessageListener, ExceptionListener { private Connection connection; private Session session; private MessageConsumer consumer; private Destination destination; private static int count = 1; public AMQConsumer(Connection amqConnection, Session amqSession, Destination queue) { connection = amqConnection; session = amqSession; destination = queue; try { connection.setExceptionListener(this); if(connection != null) connection.start(); } catch (JMSException e) { e.printStackTrace(); } } synchronized public void onException(JMSException ex) { System.out.println("JMS Exception occurred. Shutting down client."); System.exit(1); } public void onMessage(Message message) { try { Thread.sleep(2000);// Slow Consumer } catch (InterruptedException e) { e.printStackTrace(); } //System.out.println("\nFOUND SOME MESSAGE - 1"); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("\nI AM :: " + this.getName() + " Received - " +textMessage.getText() + " - " + count++); } catch (Exception e) { e.printStackTrace(); } } else { System.out.println("Received - NonTextMessage - " + message); } //System.out.println("\nFOUND SOME MESSAGE - 2"); } public void init(){ try { consumer = session.createConsumer(destination); consumer.setMessageListener(this); } catch (JMSException e) { e.printStackTrace(); } } }