1 Reply Latest reply on Sep 18, 2008 7:03 AM by Tim Fox

    High volume usage patterns and best practices

    Brandon Opfermann Newbie

      I am trying to implement a system that will require consistently high volume with long running and constantly connected clients. To date I have followed the samples included with JBoss Messaging and am creating a new connection+session+temp_response_queue on a per request basis. It was suggested that some of this is actually an anti-pattern.

      This actually works fairly well and sufficiently fast on my local development environment aka, laptop, but breaks down when I promote it to a clustered development environment that is otherwise identical. I fairly quickly run out of memory, and get a 25+% failure rate even at low volumes. I am inclined to believe it is in part due to the way in which I am accessing the queue's and topics.

      I would like advice on the best way to handle the "pooling" of some of these objects (connections and sessions) for reuse. Is this advisable at all? If so, is there solution examples anyone is aware of? I would hate to have to create my own pooling logic with all the inherent issues in high volume systems.

      One example of where I do not see the feasibility of this is in the connection object. I do not see any exposed way in the API to "test" the connectedness of the connection objects. I would assume in any pooling situation I would have to make sure it is still alive, etc.

      Any help would be appreciated.

      Thanks,
      Brandon
      =======================================================
      Solution Descrpition
      Client: A simple one form one page JSF app
      Server: A fairly simple MDB. It currently makes an HTTP request and echo's the response to a client created temporary response queue. It also forwards a copy to an "observer" topic for further SOA visibility.

      public String executeSynchronous(String resourceUrl, HTTPMethod method, String payload, final long timeout) {
       JMSUtil jmsUtil = new JMSUtil();
       if (timeout > MAX_TIMEOUT) {
       return "Error: timeout provided exeeds MAX_TIMEOUT(" + MAX_TIMEOUT + ")";
       }
      
       Map<String, String> headers = new HashMap<String, String>();
       headers.put("Method", method.name());
       headers.put("URL", resourceUrl);
      
       String messageId = JMSUtil.generateUUID();
       Session session = jmsUtil.generateSession();
       Queue responseQueue = jmsUtil.generateResponseQueue(session);
       jmsUtil.sendSOAQueueMessage(session, payload, jndiReferenceSOAIn, headers, messageId, responseQueue);
       String response = jmsUtil.getSOAResponse(session, responseQueue, timeout);
       jmsUtil.closeSession(session);
      
       return response;
       }
      

      calls...
      public void sendSOAQueueMessage(Session session, String payload, String jndiReference, Map<String, String> headers, String messageId, Queue responseQueue) {
       try {
       // Create Message
       TextMessage message = session.createTextMessage(payload);
      
       // Assign Headers
       for (String headerName : headers.keySet()) {
       message.setStringProperty(headerName, headers.get(headerName));
       }
      
       // Set UUIDreturn null;
       message.setStringProperty("EntertainmentSOARequestId", messageId);
       message.setJMSReplyTo(responseQueue);
      
       // Create the producer.
      
       MessageProducer sender = session.createProducer(getSOAInQueue());
       sender.send(message);
       sender.close();
      
       // Commit if needed
       if (transacted) {
       session.commit();
       }
       } catch (JMSException e) {
       logger.error("A JMS Exception occurred in sending a JMS SOA message!", e);
       }
       }
      


      Supported by the following util methods...
      rotected static Connection getConnection() {
       if (connection == null) {
       Hashtable properties = new Hashtable();
       properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
       properties.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
       properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
       properties.put(Context.SECURITY_PRINCIPAL, "not_real");
       properties.put(Context.SECURITY_CREDENTIALS, "not_real");
      
       Context context;
       try {
       context = new InitialContext(properties);
       connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory");
       } catch (NamingException e) {
      
       e.printStackTrace();
       return null;
       }
      
       try {
       connection = connectionFactory.createConnection();
       connection.start();
       } catch (JMSException e) {
       return null;
       }
       }
       return connection;
       }
      public Queue generateResponseQueue(Session session) {
       try {
       return session.createTemporaryQueue();
       } catch (JMSException e) {
       logger.error("Could not generate temporary queue for receiving response", e);
       return null;
       }
       }
      public Session generateSession() {
       try {
       // Get a connection
       Connection connection = getConnection();
      
       // Create the session
       return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       } catch (JMSException e) {
       logger.error("Could not generate session", e);
       return null;
       }
       }
      public String getSOAResponse(Session session, Destination responseQueue, long timeout) {
       try {
       // Create the consumer
       MessageConsumer consumer = session.createConsumer(responseQueue);
       Message message = consumer.receive(timeout);
       message.acknowledge();
       consumer.close();
      
       if (message != null && message instanceof TextMessage) {
       return ((TextMessage) message).getText();
       } else {
       logger.error("Recieved message was not of type TextMessage!");
       return "Recieved message was not of type TextMessage!";
       }
      
       } catch (JMSException e) {
       logger.error("A JMS Exception occurred in receiving a JMS SOA message!", e);
       return "A JMS Exception occurred in receiving a JMS SOA message! " + e.getMessage();
       }
       }
      


      MDB

      @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
       @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/SOAInQueue"),
       @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
      public class SOARouterMDB implements MessageListener {
      
       private final RESTAccessor restAccessor = new RESTAccessor();
      
       private static Logger logger = Logger.getLogger(SOARouterMDB.class);
      
       public void onMessage(Message msg) {
       try {
       Responder responder = new Responder();
       TextMessage tmsg = (TextMessage) msg;
      
       HTTPMethod method = HTTPMethod.valueOf(tmsg.getStringProperty("Method"));
       String url = tmsg.getStringProperty("URL");
       Map<String, String> httpHeaders = new HashMap<String, String>();
       Map<String, String> allHeaders = new HashMap<String, String>();
       Destination destination = tmsg.getJMSReplyTo();
      
       System.out.println("Destination: " + destination);
      
       // Load HttpHeaders
       Enumeration<String> propertyNames = tmsg.getPropertyNames();
       while (propertyNames.hasMoreElements()) {
       String propertyName = propertyNames.nextElement();
       if (propertyName.startsWith("HttpHeader")) {
       httpHeaders.put(propertyName.substring(10), tmsg.getStringProperty(propertyName));
       }
       allHeaders.put(propertyName, tmsg.getStringProperty(propertyName));
       }
      
       HTTPResponse response = null;
      
       switch (method) {
       case GET:
       response = restAccessor.doGet(url, httpHeaders);
       if (!response.isError()) {
       responder.respond(destination, allHeaders, response.getData());
       responder.broadcast(allHeaders, response.getData());
       } else {
       responder.reportError(tmsg);
       }
       break;
       case POST:
       response = restAccessor.doPost(url, httpHeaders, new ByteArrayInputStream(tmsg.getText().getBytes()));
       if (!response.isError()) {
       responder.respond(destination, allHeaders, response.getData());
       responder.broadcast(allHeaders, response.getData());
       } else {
       responder.reportError(tmsg);
       }
       break;
       case DELETE:
       response = restAccessor.doDelete(url, httpHeaders, tmsg.getText());
       if (!response.isError()) {
       responder.respond(destination, allHeaders, response.getData());
       responder.broadcast(allHeaders, response.getData());
       } else {
       responder.reportError(tmsg);
       }
       break;
       case PUT:
       response = restAccessor.doPost(url, httpHeaders, new ByteArrayInputStream(tmsg.getText().getBytes()));
       if (!response.isError()) {
       responder.respond(destination, allHeaders, response.getData());
       responder.broadcast(allHeaders, response.getData());
       } else {
       responder.reportError(tmsg);
       }
       break;
       default:
       // TODO: asdf
       }
       } catch (Exception e) {
       logger.error("Error processing SOA request", e);
       }
       }
      
       @PreDestroy
       public void destroy() {
       }
      
      }
      


        • 1. Re: High volume usage patterns and best practices
          Tim Fox Master

           

          "xxbrandonoxx" wrote:
          I am trying to implement a system that will require consistently high volume with long running and constantly connected clients. To date I have followed the samples included with JBoss Messaging and am creating a new connection+session+temp_response_queue on a per request basis. It was suggested that some of this is actually an anti-pattern.


          Yes, that would certainly constitute an anti-pattern



          This actually works fairly well and sufficiently fast on my local development environment aka, laptop, but breaks down when I promote it to a clustered development environment that is otherwise identical. I fairly quickly run out of memory, and get a 25+% failure rate even at low volumes. I am inclined to believe it is in part due to the way in which I am accessing the queue's and topics.

          I would like advice on the best way to handle the "pooling" of some of these objects (connections and sessions) for reuse. Is this advisable at all? If so, is there solution examples anyone is aware of? I would hate to have to create my own pooling logic with all the inherent issues in high volume systems.


          You should have a look at the wiki, docs etc, this is a very common question.

          http://wiki.jboss.org/wiki/JBossJMSRA