On durable subsciption in a synchronous mode.
francesco_81 Dec 4, 2008 10:49 AMHI all,
I'm in trouble about two things using durable subscriber in synchronous mode.
First, is exactly once semantic and ordered garanted on receiver side?
Second, I experimented that a second durable late joiner on a topic does't receive any message while the first receive it. This also happen if I change the clientId on the connection for the second late joiner. Is there a manner to get this behavior?
Follow you can find the Sender and Receiver code that I use in my test
Best Regards,
Francesco Russo.
--------------------------- Sender ------------------------
public class Sender {
public static void main(String[] args) {
Properties props;
Context jndiContext;
TopicConnectionFactory connectionFactory;
TopicConnection connection;
TopicSession session;
Topic jmsTopic;
TopicPublisher writer;
TextMessage message;
try {
System.out.println("--- SENDER ---");
props = new Properties();
props.load(new FileInputStream("jndi.properties"));
System.out.println("props: "+props);
jndiContext = new InitialContext(props);
connectionFactory = (TopicConnectionFactory)jndiContext.lookup("ConnectionFactory");
connection = connectionFactory.createTopicConnection();
connection.start();
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
System.out.println("session: "+ session);
jmsTopic = (Topic) jndiContext.lookup("topic/example_francesco");
System.out.println("topic: "+jmsTopic);
writer = session.createPublisher(jmsTopic);
//message = session.createTextMessage("Hello!");
for (int i=0; i<5; i++){
message = session.createTextMessage("Hello!"+i);
System.out.println("[Sender] writing message: "+message.getText());
writer.send(message);
Thread.sleep(1000);
}
System.out.println("--- END SENDER ---");
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
------------------------ Receiver -----------------------------------
public class Receiver {
public static void main(String[] args) {
Properties props;
Context jndiContext;
TopicConnectionFactory connectionFactory;
TopicConnection connection;
TopicSession session;
Topic jmsTopic;
TopicSubscriber reader;
TextMessage message;
try {
System.out.println("--- READER ---");
props = new Properties();
props.load(new FileInputStream("jndi.properties"));
jndiContext = new InitialContext(props);
System.out.println("props: "+props);
connectionFactory = (TopicConnectionFactory)jndiContext.lookup("ConnectionFactory");
connection = connectionFactory.createTopicConnection();
connection.setClientID("client_06");
connection.start();
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
System.out.println("session: "+ session);
jmsTopic = (Topic) jndiContext.lookup("topic/example_francesco");
System.out.println("topic: "+jmsTopic);
reader = session.createDurableSubscriber(jmsTopic,"client_06");
System.out.println("[Reader] begin receiving");
TextMessage msg;
for (int i=0; i<100; i++){
Thread.sleep(1000);
msg = ((TextMessage)reader.receive(10));
if (msg != null){
System.out.println("[Receiver] received: " + msg.getText());
}else System.out.println("[Receiver] received: " + msg);
}
System.out.println("--- END READER ---");
} catch (NamingException e) {
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}