1 Reply Latest reply on Dec 17, 2008 10:56 AM by dejanb_dejan

    Problem in sending SOAP with Attachment using Active MQ

    jayasreeb

      Hi,

       

      My requirement is to place SOAP Message with Attachment(binary data- Java object) in JMS. I am using fuse-message-broker-5.0.0.17.

       

      I have a producer program which creates a soap message with attachment(single attachment) using data handler and places it using SOAPMessageIntoJMSMessage.When i check the count of Attachment in Producer class its giving 1 before placing the SOAP Message into JMS Message.

       

      I have COnsumer class which picks up the same JMS message and displays the content of soap message. In this class when i check the attachment count its giving 0.

       

      Do i need to do any settings in Active MQ/ consumer.java to accept the attachment?

       

      I have placed following jars in my workspace.

      1.activation.jar

      2.axis.jar(1.4)

      3.imq.jar

      4.imqxm.jar

      5.saaj.jar

      6.mail.jar

      7.wsdl4j-1.5.1.jar

      8.jaxm-api.jar

      9.jaxrpc.jar

      10.commons-discovery-0.2.jar

      11.commons-logging-1.0.4.jar

      12.activemq-all-5.0.0.17-fuse.jar

       

       

      Please find both producer and consumer class details. Please suggest me solution for this problem.

       

       

      Producer.java

      import java.awt.datatransfer.DataFlavor;

      import java.io.File;

      import java.net.URL;

       

      import javax.xml.parsers.DocumentBuilder;

      import javax.xml.parsers.DocumentBuilderFactory;

      import javax.xml.soap.MessageFactory;

      import javax.activation.DataHandler;

      import javax.activation.DataSource;

      import javax.activation.FileDataSource;

      import javax.jms.Connection;

      import javax.jms.DeliveryMode;

      import javax.jms.Destination;

      import javax.jms.Message;

      import javax.jms.MessageProducer;

      import javax.jms.Session;

      import javax.mail.util.ByteArrayDataSource;

      import javax.xml.soap.AttachmentPart;

      import javax.xml.soap.SOAPBody;

      import javax.xml.soap.SOAPBodyElement;

      import javax.xml.soap.SOAPEnvelope;

      import javax.xml.soap.SOAPMessage;

      import javax.xml.soap.SOAPPart;

      import org.apache.activemq.ActiveMQConnection;

      import org.apache.activemq.ActiveMQConnectionFactory;

      import org.apache.activemq.util.IndentPrinter;

      import org.w3c.dom.Document;

      import com.sun.messaging.xml.MessageTransformer;

       

      public class Producer {

              private Destination destination;

          private int messageCount = 10;

          private long sleepTime;

          private boolean verbose = true;

          private int messageSize = 255;

          private long timeToLive;

          private String user = ActiveMQConnection.DEFAULT_USER;

          private String password = ActiveMQConnection.DEFAULT_PASSWORD;

          private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

          private String subject = "TOOL1.DEFAULT";

          private boolean topic;

          private boolean transacted;

          private boolean persistent;

          public static void main(String[] args) {

              Producer producerTool = new Producer();

              producerTool.run();

          }

          public void run() {

              Connection connection = null;

              try {

                  System.out.println("Connecting to URL: " + url);

                  System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);

                  System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");

                  System.out.println("Sleeping between publish " + sleepTime + " ms");

                  if (timeToLive != 0) {

                      System.out.println("Messages time to live " + timeToLive + " ms");

                  }

       

                  // Create the connection.

               ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

                //  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://blrkec38454d.ad.infosys.com:61616");

                   

                  connection = connectionFactory.createConnection();

                  connection.start();

       

                  // Create the session

                  Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);

                  if (topic) {

                      destination = session.createTopic(subject);

                  } else {

                      destination = session.createQueue(subject);

                  }

       

                  // Create the producer.

                  MessageProducer producer = session.createProducer(destination);

                  if (persistent) {

                      producer.setDeliveryMode(DeliveryMode.PERSISTENT);

                  } else {

                      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                  }

                  if (timeToLive != 0) {

                      producer.setTimeToLive(timeToLive);

                  }

       

                  // Start sending messages

                  sendLoop(session, producer);

       

                  System.out.println("Done.");

       

                  // Use the ActiveMQConnection interface to dump the connection

                  // stats.

                  ActiveMQConnection c = (ActiveMQConnection)connection;

                  c.getConnectionStats().dump(new IndentPrinter());

       

              } catch (Exception e) {

                  System.out.println("Caught: " + e);

                  e.printStackTrace();

              } finally {

                  try {

                      connection.close();

                  } catch (Throwable ignore) {

                  }

              }

          }

          protected void sendLoop(Session session, MessageProducer producer) throws Exception {

       

              //for (int i = 0; i < messageCount || messageCount == 0; i++) {

             try{

            System.out.println("in send loop");

                 

       

                  /*construct a default soap MessageFactory */

                  MessageFactory mf = MessageFactory.newInstance();

                   

                  /* Create a SOAP message object.*/

                  SOAPMessage soapMessage = mf.createMessage();

                   

                // Test with objects

                   

                  Person p = new Person();

                  Address a = new Address();

                  Address b = new Address();

                  p.setLname("Jayasree");

                  p.setMname("");

                  p.setLname("Balasubramanian");

                  a.setAdline1("infosys");

                  a.setAdline2("");

                  a.setCity("mangalore");

                  a.setCountry("india");

                  a.setState("");

                  p.setAddress(a);

                   

       

       

                 

                   

                     DocumentBuilderFactory factory =

                DocumentBuilderFactory.newInstance();

                factory.setNamespaceAware(true);

               

                DocumentBuilder builder =

                factory.newDocumentBuilder();

                Document document = builder.parse( new File("C:/Address_Std.xml") );

               

      //            document can be any XML document

                SOAPBody soapBody = soapMessage.getSOAPBody();

                SOAPBodyElement docElement =  soapBody.addDocument(document);                                                                               

      //Create an attachment with the Java Framework Activation API

                 URL url = new URL("http://java.sun.com/webservices/");

                 DataHandler dh = new DataHandler (url);

                 AttachmentPart ap = soapMessage.createAttachmentPart(dh);

       

                 //Set content type and ID

                 ap.setContentType("text/html");

                 ap.setContentId("cid-001");

       

                 //Add attachment to the SOAP message

                 soapMessage.addAttachmentPart(ap);

                 soapMessage.saveChanges();

       

                         

       

          // add attachment to message      

                 

                 soapMessage.saveChanges();

                   

                 soapMessage.writeTo(System.out);

                 System.out.println("inside producer--att count-"+soapMessage.countAttachments());

                  Message m = MessageTransformer.SOAPMessageIntoJMSMessage(soapMessage, session );

                  System.out.println("Display the SOAP message"+m);

                  producer.send(m);

                  if (transacted) {

                      session.commit();

                  }

       

                 // Thread.sleep(sleepTime);

       

               

          }catch(Exception e){

          System.out.println("exception-->"+e);

          e.printStackTrace();

          }

          }

           

       

           

          public void setPersistent(boolean durable) {

              this.persistent = durable;

          }

       

          public void setMessageCount(int messageCount) {

              this.messageCount = messageCount;

          }

       

          public void setMessageSize(int messageSize) {

              this.messageSize = messageSize;

          }

       

          public void setPassword(String pwd) {

              this.password = pwd;

          }

       

          public void setSleepTime(long sleepTime) {

              this.sleepTime = sleepTime;

          }

       

          public void setSubject(String subject) {

              this.subject = subject;

          }

       

          public void setTimeToLive(long timeToLive) {

              this.timeToLive = timeToLive;

          }

       

          public void setTopic(boolean topic) {

              this.topic = topic;

          }

       

          public void setQueue(boolean queue) {

              this.topic = !queue;

          }

       

          public void setTransacted(boolean transacted) {

              this.transacted = transacted;

          }

       

          public void setUrl(String url) {

              this.url = url;

          }

       

          public void setUser(String user) {

              this.user = user;

          }

       

          public void setVerbose(boolean verbose) {

              this.verbose = verbose;

          }

      }

       

       

      Consumer.java

       

      import java.io.IOException;

      import java.util.Arrays;

       

      import javax.activation.DataHandler;

      import javax.jms.Connection;

      import javax.jms.DeliveryMode;

      import javax.jms.Destination;

      import javax.jms.ExceptionListener;

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageConsumer;

      import javax.jms.MessageListener;

      import javax.jms.MessageProducer;

      import javax.jms.Session;

      import javax.jms.TextMessage;

      import javax.jms.Topic;

      import javax.xml.soap.AttachmentPart;

      import javax.xml.soap.MessageFactory;

      import javax.xml.soap.Name;

      import javax.xml.soap.SOAPBody;

      import javax.xml.soap.SOAPBodyElement;

      import javax.xml.soap.SOAPException;

      import javax.xml.soap.SOAPMessage;

      import javax.xml.transform.Source;

      import javax.xml.transform.Transformer;

      import javax.xml.transform.TransformerFactory;

      import javax.xml.transform.stream.StreamResult;

       

      import org.apache.activemq.ActiveMQConnection;

      import org.apache.activemq.ActiveMQConnectionFactory;

       

      import com.sun.messaging.xml.MessageTransformer;

       

      public class Consumer implements MessageListener, ExceptionListener {

              private boolean running;

       

          private Session session;

          private Destination destination;

          private MessageProducer replyProducer;

       

          private boolean pauseBeforeShutdown;

          private boolean verbose = true;

          private int maxiumMessages;

          private String subject = "TOOL1.DEFAULT";

          private boolean topic;

          private String user = ActiveMQConnection.DEFAULT_USER;

          private String password = ActiveMQConnection.DEFAULT_PASSWORD;

          private String url = ActiveMQConnection.DEFAULT_BROKER_URL;

          private boolean transacted;

          private boolean durable;

          private String clientId;

          private int ackMode = Session.AUTO_ACKNOWLEDGE;

          private String consumerName = "James";

          private long sleepTime;

          private long receiveTimeOut;

          Name bodyName;

       

          public static void main(String[] args) {

              Consumer consumerTool = new Consumer();

              consumerTool.run();

          }

       

          public void run() {

              try {

                  running = true;

       

                  System.out.println("Connecting to URL: " + url);

                  System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);

                  System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");

       

                  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);

                  Connection connection = connectionFactory.createConnection();

                  if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {

                      connection.setClientID(clientId);

                  }

                  connection.setExceptionListener(this);

                  connection.start();

       

                  session = connection.createSession(transacted, ackMode);

                  if (topic) {

                      destination = session.createTopic(subject);

                  } else {

                      destination = session.createQueue(subject);

                  }

       

                  replyProducer = session.createProducer(null);

                  replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

       

                  MessageConsumer consumer = null;

                  if (durable && topic) {

                      consumer = session.createDurableSubscriber((Topic)destination, consumerName);

                  } else {

                      consumer = session.createConsumer(destination);

                  }

                  System.out.println("before  and  calling the consume method");

                  if (maxiumMessages > 0) {

                  System.out.println("Inside if part");

                      consumeMessagesAndClose(connection, session, consumer);

                  } else {

                      if (receiveTimeOut == 0) {

                      System.out.println("Inside else if part");

                          consumer.setMessageListener(this);

                      } else {

                      System.out.println("Inside else part");

                          consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);

                      }

                  }

       

              } catch (Exception e) {

                  System.out.println("Caught: " + e);

                  e.printStackTrace();

              }

          }

       

          public void onMessage(Message message) {

              try {

       

                  MessageFactory messageFactory = MessageFactory.newInstance();

                   

                  SOAPMessage soapMessage =

                   MessageTransformer.SOAPMessageFromJMSMessage( message,messageFactory );

                  soapMessage.writeTo(System.out);

                   

      //Extract the content of the reply

        System.out.println("Attachment count in consumer -


      >"+soapMessage.countAttachments());

         

         

         

       

         

         

      java.util.Iterator iterator = soapMessage.getAttachments();

         

      while (iterator.hasNext()) {

               

         

                 

           DataHandler dh = ((AttachmentPart)iterator.next()).getDataHandler();

          Object obj = dh.getContent();

         //  if(null != fname)

             // return new File(fname);

           

      }

         

       

      Source sourceContent = soapMessage.getSOAPPart().getContent();

      //Set the output for the transformation

         

      StreamResult result = new StreamResult(System.out);

       

          TransformerFactory transformerFactory =

          TransformerFactory.newInstance();

       

      Transformer transformer =

      transformerFactory.newTransformer();

      transformer.transform(sourceContent, result);

       

       

                  if (message.getJMSReplyTo() != null) {

                   

                      replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));

                  }

       

                  if (transacted) {

                   

                      session.commit();

                  } else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {

                      message.acknowledge();

                  }

       

              } catch (JMSException e) {

                 /* System.out.println("Caught JMS Error Code: " + e.getErrorCode());

                  System.out.println("Caught JMS: " + e.getLocalizedMessage());

                  System.out.println("Caught JMS: " + e.getMessage());*/

                  e.printStackTrace();

              }

              catch (Exception e) {

               

                   System.out.println("Caught JMS: " + e.getMessage());

              e.printStackTrace();

               

              }

              finally {

              }

                  if (sleepTime > 0) {

                      try {

                          Thread.sleep(sleepTime);

                      } catch (InterruptedException e) {

                      }

                  }

              }

           

       

          public synchronized void onException(JMSException ex) {

              System.out.println("JMS Exception occured.  Shutting down client.");

              running = false;

          }

       

          synchronized boolean isRunning() {

              return running;

          }

       

          protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {

              System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");

              System.out.println("Inside method part");

              for (int i = 0; i < maxiumMessages && isRunning();) {

                  Message message = consumer.receive(1000);

                  if (message != null) {

                      i++;

                      onMessage(message);

                  }

              }

              System.out.println("Closing connection");

              consumer.close();

              session.close();

              connection.close();

              if (pauseBeforeShutdown) {

                  System.out.println("Press return to shut down");

                  System.in.read();

              }

          }

       

          protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {

              System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");

       

              Message message;

              while ((message = consumer.receive(timeout)) != null) {

                  onMessage(message);

              }

       

              System.out.println("Closing connection");

              consumer.close();

              session.close();

              connection.close();

              if (pauseBeforeShutdown) {

                  System.out.println("Press return to shut down");

                  System.in.read();

              }

          }

       

          public void setAckMode(String ackMode) {

              if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {

                  this.ackMode = Session.CLIENT_ACKNOWLEDGE;

              }

              if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {

                  this.ackMode = Session.AUTO_ACKNOWLEDGE;

              }

              if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {

                  this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;

              }

              if ("SESSION_TRANSACTED".equals(ackMode)) {

                  this.ackMode = Session.SESSION_TRANSACTED;

              }

          }

       

          public void setClientId(String clientID) {

              this.clientId = clientID;

          }

       

          public void setConsumerName(String consumerName) {

              this.consumerName = consumerName;

          }

       

          public void setDurable(boolean durable) {

              this.durable = durable;

          }

       

          public void setMaxiumMessages(int maxiumMessages) {

              this.maxiumMessages = maxiumMessages;

          }

       

          public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {

              this.pauseBeforeShutdown = pauseBeforeShutdown;

          }

       

          public void setPassword(String pwd) {

              this.password = pwd;

          }

       

          public void setReceiveTimeOut(long receiveTimeOut) {

              this.receiveTimeOut = receiveTimeOut;

          }

       

          public void setSleepTime(long sleepTime) {

              this.sleepTime = sleepTime;

          }

       

          public void setSubject(String subject) {

              this.subject = subject;

          }

       

          public void setTopic(boolean topic) {

              this.topic = topic;

          }

       

          public void setQueue(boolean queue) {

              this.topic = !queue;

          }

       

          public void setTransacted(boolean transacted) {

              this.transacted = transacted;

          }

       

          public void setUrl(String url) {

              this.url = url;

          }

       

          public void setUser(String user) {

              this.user = user;

          }

       

          public void setVerbose(boolean verbose) {

              this.verbose = verbose;

          }

       

      }