Problem in sending SOAP with Attachment using Active MQ
jayasreeb Oct 16, 2008 11:50 PMHi,
My requirement is to place SOAP Message with Attachment(binary data- Java object) in JMS. I am using fuse-message-broker-5.0.0.17.
I have a producer program which creates a soap message with attachment(single attachment) using data handler and places it using SOAPMessageIntoJMSMessage.When i check the count of Attachment in Producer class its giving 1 before placing the SOAP Message into JMS Message.
I have COnsumer class which picks up the same JMS message and displays the content of soap message. In this class when i check the attachment count its giving 0.
Do i need to do any settings in Active MQ/ consumer.java to accept the attachment?
I have placed following jars in my workspace.
1.activation.jar
2.axis.jar(1.4)
3.imq.jar
4.imqxm.jar
5.saaj.jar
6.mail.jar
7.wsdl4j-1.5.1.jar
8.jaxm-api.jar
9.jaxrpc.jar
10.commons-discovery-0.2.jar
11.commons-logging-1.0.4.jar
12.activemq-all-5.0.0.17-fuse.jar
Please find both producer and consumer class details. Please suggest me solution for this problem.
Producer.java
import java.awt.datatransfer.DataFlavor;
import java.io.File;
import java.net.URL;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.soap.MessageFactory;
import javax.activation.DataHandler;
import javax.activation.DataSource;
import javax.activation.FileDataSource;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.mail.util.ByteArrayDataSource;
import javax.xml.soap.AttachmentPart;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPBodyElement;
import javax.xml.soap.SOAPEnvelope;
import javax.xml.soap.SOAPMessage;
import javax.xml.soap.SOAPPart;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;
import org.w3c.dom.Document;
import com.sun.messaging.xml.MessageTransformer;
public class Producer {
private Destination destination;
private int messageCount = 10;
private long sleepTime;
private boolean verbose = true;
private int messageSize = 255;
private long timeToLive;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String subject = "TOOL1.DEFAULT";
private boolean topic;
private boolean transacted;
private boolean persistent;
public static void main(String[] args) {
Producer producerTool = new Producer();
producerTool.run();
}
public void run() {
Connection connection = null;
try {
System.out.println("Connecting to URL: " + url);
System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
System.out.println("Sleeping between publish " + sleepTime + " ms");
if (timeToLive != 0) {
System.out.println("Messages time to live " + timeToLive + " ms");
}
// Create the connection.
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
// ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://blrkec38454d.ad.infosys.com:61616");
connection = connectionFactory.createConnection();
connection.start();
// Create the session
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
// Create the producer.
MessageProducer producer = session.createProducer(destination);
if (persistent) {
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
if (timeToLive != 0) {
producer.setTimeToLive(timeToLive);
}
// Start sending messages
sendLoop(session, producer);
System.out.println("Done.");
// Use the ActiveMQConnection interface to dump the connection
// stats.
ActiveMQConnection c = (ActiveMQConnection)connection;
c.getConnectionStats().dump(new IndentPrinter());
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Throwable ignore) {
}
}
}
protected void sendLoop(Session session, MessageProducer producer) throws Exception {
//for (int i = 0; i < messageCount || messageCount == 0; i++) {
try{
System.out.println("in send loop");
/*construct a default soap MessageFactory */
MessageFactory mf = MessageFactory.newInstance();
/* Create a SOAP message object.*/
SOAPMessage soapMessage = mf.createMessage();
// Test with objects
Person p = new Person();
Address a = new Address();
Address b = new Address();
p.setLname("Jayasree");
p.setMname("");
p.setLname("Balasubramanian");
a.setAdline1("infosys");
a.setAdline2("");
a.setCity("mangalore");
a.setCountry("india");
a.setState("");
p.setAddress(a);
DocumentBuilderFactory factory =
DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
DocumentBuilder builder =
factory.newDocumentBuilder();
Document document = builder.parse( new File("C:/Address_Std.xml") );
// document can be any XML document
SOAPBody soapBody = soapMessage.getSOAPBody();
SOAPBodyElement docElement = soapBody.addDocument(document);
//Create an attachment with the Java Framework Activation API
URL url = new URL("http://java.sun.com/webservices/");
DataHandler dh = new DataHandler (url);
AttachmentPart ap = soapMessage.createAttachmentPart(dh);
//Set content type and ID
ap.setContentType("text/html");
ap.setContentId("cid-001");
//Add attachment to the SOAP message
soapMessage.addAttachmentPart(ap);
soapMessage.saveChanges();
// add attachment to message
soapMessage.saveChanges();
soapMessage.writeTo(System.out);
System.out.println("inside producer--att count-"+soapMessage.countAttachments());
Message m = MessageTransformer.SOAPMessageIntoJMSMessage(soapMessage, session );
System.out.println("Display the SOAP message"+m);
producer.send(m);
if (transacted) {
session.commit();
}
// Thread.sleep(sleepTime);
}catch(Exception e){
System.out.println("exception-->"+e);
e.printStackTrace();
}
}
public void setPersistent(boolean durable) {
this.persistent = durable;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}
Consumer.java
import java.io.IOException;
import java.util.Arrays;
import javax.activation.DataHandler;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.xml.soap.AttachmentPart;
import javax.xml.soap.MessageFactory;
import javax.xml.soap.Name;
import javax.xml.soap.SOAPBody;
import javax.xml.soap.SOAPBodyElement;
import javax.xml.soap.SOAPException;
import javax.xml.soap.SOAPMessage;
import javax.xml.transform.Source;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.sun.messaging.xml.MessageTransformer;
public class Consumer implements MessageListener, ExceptionListener {
private boolean running;
private Session session;
private Destination destination;
private MessageProducer replyProducer;
private boolean pauseBeforeShutdown;
private boolean verbose = true;
private int maxiumMessages;
private String subject = "TOOL1.DEFAULT";
private boolean topic;
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private boolean transacted;
private boolean durable;
private String clientId;
private int ackMode = Session.AUTO_ACKNOWLEDGE;
private String consumerName = "James";
private long sleepTime;
private long receiveTimeOut;
Name bodyName;
public static void main(String[] args) {
Consumer consumerTool = new Consumer();
consumerTool.run();
}
public void run() {
try {
running = true;
System.out.println("Connecting to URL: " + url);
System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
System.out.println("Using a " + (durable ? "durable" : "non-durable") + " subscription");
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
Connection connection = connectionFactory.createConnection();
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
connection.setClientID(clientId);
}
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ackMode);
if (topic) {
destination = session.createTopic(subject);
} else {
destination = session.createQueue(subject);
}
replyProducer = session.createProducer(null);
replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = null;
if (durable && topic) {
consumer = session.createDurableSubscriber((Topic)destination, consumerName);
} else {
consumer = session.createConsumer(destination);
}
System.out.println("before and calling the consume method");
if (maxiumMessages > 0) {
System.out.println("Inside if part");
consumeMessagesAndClose(connection, session, consumer);
} else {
if (receiveTimeOut == 0) {
System.out.println("Inside else if part");
consumer.setMessageListener(this);
} else {
System.out.println("Inside else part");
consumeMessagesAndClose(connection, session, consumer, receiveTimeOut);
}
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
MessageFactory messageFactory = MessageFactory.newInstance();
SOAPMessage soapMessage =
MessageTransformer.SOAPMessageFromJMSMessage( message,messageFactory );
soapMessage.writeTo(System.out);
//Extract the content of the reply
System.out.println("Attachment count in consumer -
>"+soapMessage.countAttachments());
java.util.Iterator iterator = soapMessage.getAttachments();
while (iterator.hasNext()) {
DataHandler dh = ((AttachmentPart)iterator.next()).getDataHandler();
Object obj = dh.getContent();
// if(null != fname)
// return new File(fname);
}
Source sourceContent = soapMessage.getSOAPPart().getContent();
//Set the output for the transformation
StreamResult result = new StreamResult(System.out);
TransformerFactory transformerFactory =
TransformerFactory.newInstance();
Transformer transformer =
transformerFactory.newTransformer();
transformer.transform(sourceContent, result);
if (message.getJMSReplyTo() != null) {
replyProducer.send(message.getJMSReplyTo(), session.createTextMessage("Reply: " + message.getJMSMessageID()));
}
if (transacted) {
session.commit();
} else if (ackMode == Session.CLIENT_ACKNOWLEDGE) {
message.acknowledge();
}
} catch (JMSException e) {
/* System.out.println("Caught JMS Error Code: " + e.getErrorCode());
System.out.println("Caught JMS: " + e.getLocalizedMessage());
System.out.println("Caught JMS: " + e.getMessage());*/
e.printStackTrace();
}
catch (Exception e) {
System.out.println("Caught JMS: " + e.getMessage());
e.printStackTrace();
}
finally {
}
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
public synchronized void onException(JMSException ex) {
System.out.println("JMS Exception occured. Shutting down client.");
running = false;
}
synchronized boolean isRunning() {
return running;
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
System.out.println("Inside method part");
for (int i = 0; i < maxiumMessages && isRunning();) {
Message message = consumer.receive(1000);
if (message != null) {
i++;
onMessage(message);
}
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");
Message message;
while ((message = consumer.receive(timeout)) != null) {
onMessage(message);
}
System.out.println("Closing connection");
consumer.close();
session.close();
connection.close();
if (pauseBeforeShutdown) {
System.out.println("Press return to shut down");
System.in.read();
}
}
public void setAckMode(String ackMode) {
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.CLIENT_ACKNOWLEDGE;
}
if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.AUTO_ACKNOWLEDGE;
}
if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
}
if ("SESSION_TRANSACTED".equals(ackMode)) {
this.ackMode = Session.SESSION_TRANSACTED;
}
}
public void setClientId(String clientID) {
this.clientId = clientID;
}
public void setConsumerName(String consumerName) {
this.consumerName = consumerName;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public void setMaxiumMessages(int maxiumMessages) {
this.maxiumMessages = maxiumMessages;
}
public void setPauseBeforeShutdown(boolean pauseBeforeShutdown) {
this.pauseBeforeShutdown = pauseBeforeShutdown;
}
public void setPassword(String pwd) {
this.password = pwd;
}
public void setReceiveTimeOut(long receiveTimeOut) {
this.receiveTimeOut = receiveTimeOut;
}
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
public void setSubject(String subject) {
this.subject = subject;
}
public void setTopic(boolean topic) {
this.topic = topic;
}
public void setQueue(boolean queue) {
this.topic = !queue;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public void setUrl(String url) {
this.url = url;
}
public void setUser(String user) {
this.user = user;
}
public void setVerbose(boolean verbose) {
this.verbose = verbose;
}
}