4 Replies Latest reply on Jun 10, 2005 7:45 PM by Adrian Brock

    The missing prototype...

    Adrian Brock Master

      Ovidiu, after reading your comments about race conditions in revision1
      I went to my abandoned prototype and remembered that I didn't commit
      the final prototype that did the all the receive processing.
      I couldn't find it on my laptop either, so it is either on my other laptop
      or I deleted it :-(

      I've tried to quickly reproduce it.
      Obviously I can't reproduce it exactly in such a short time, so this does not
      include things like full error checking, synchronization or close processing :-)

      Some of the names have changed from the original, and in the original
      I abstracted out policy into handlers.
      Don't worry too much about the names or the implementation not matching
      yours. This is not deliberate, I just want to demonstrate the notion of "Delivery"
      which I think I originally called an "Acknowledgement"?

      The idea is that when you deliver a message the accepts creates a "Delivery"
      which the Channel can hold. The Delivery is the vehicle for acks/nacks.
      If the channel detects that the Consumer/Receiver crashes, or it removes itself
      without completing the processing, it can do the nacks for it
      (not shown in this quick bit of code).

      The Delivery forms a sort of identity for the message processing that is
      established *before* it is actually done. It lets you trap spurious or double
      acks/nacks.

      I'll show the example code implementing the point then some test code
      with the output.

        • 1. Re: The missing prototype...
          Adrian Brock Master

          Channel

          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          public interface Channel
          {
           void addMessage(Object message) throws Throwable;
          
           void addConsumer(Consumer consumer) throws Throwable;
          
           void removeConsumer(Consumer consumer) throws Throwable;
          
           void receiveConsumer(Consumer consumer) throws Throwable;
          
           void ack(Delivery delivery) throws Throwable;
          
           void nack(Delivery delivery);
          }
          


          ChannelImpl
          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          import java.util.ArrayList;
          import java.util.HashSet;
          import java.util.Iterator;
          
          
          public class ChannelImpl implements Channel
          {
           private ArrayList messages = new ArrayList();
          
           private HashSet<Consumer> consumers = new HashSet<Consumer>();
          
           private HashSet<Consumer> waitingConsumers = new HashSet<Consumer>();
          
           public void addMessage(Object message) throws Throwable
           {
           System.out.println(this + " addMessage " + message);
           internalAddMessage(message);
           }
          
           public void addConsumer(Consumer consumer)
           {
           System.out.println(this + " addConsumer " + consumer);
           consumers.add(consumer);
           }
          
           public void removeConsumer(Consumer consumer)
           {
           System.out.println(this + " removeConsumer " + consumer);
           consumers.remove(consumer);
           }
          
           public void receiveConsumer(Consumer consumer) throws Throwable
           {
           System.out.println(this + " receive " + consumer + " " + messages);
           if (consumers.contains(consumer) == false)
           throw new Exception("Not a registered consumer " + consumers);
           Delivery delivery = null;
           for (Iterator i = messages.iterator(); i.hasNext();)
           {
           Object message = i.next();
           delivery = consumer.accepts(this, message);
           if (delivery != null)
           {
           i.remove();
           break;
           }
           }
           if (delivery != null)
           doDelivery(delivery);
           else
           waitingConsumers.add(consumer);
           }
          
           public void ack(Delivery delivery) throws Throwable
           {
           System.out.println(this + " ack " + delivery);
           }
          
           public void nack(Delivery delivery)
           {
           System.out.println(this + " nack " + delivery);
           internalAddMessage(delivery.getMessage());
           }
          
           protected void internalAddMessage(Object message)
           {
           System.out.println(this + " internalAddMessage " + message);
           Delivery delivery = findConsumer(this, message);
           if (delivery != null)
           doDelivery(delivery);
           else
           messages.add(message);
           }
          
           protected void doDelivery(Delivery delivery)
           {
           System.out.println(this + " doDelivery " + delivery);
           try
           {
           delivery.deliver();
           }
           catch (Throwable t)
           {
           delivery.nack();
           }
           }
          
           protected Delivery findConsumer(Channel channel, Object message)
           {
           System.out.println(this + " findConsumer " + channel + " " + message + " " + waitingConsumers);
           for (Iterator<Consumer> i = waitingConsumers.iterator(); i.hasNext(); )
           {
           Consumer consumer = i.next();
           Delivery delivery = consumer.accepts(channel, message);
           if (delivery != null)
           {
           if (consumer instanceof ConsumerChannel == false)
           waitingConsumers.remove(consumer);
           return delivery;
           }
           }
           return null;
           }
          }
          


          Consumer
          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          public interface Consumer
          {
           Delivery accepts(Channel channel, Object message);
          }
          


          ConsumerImpl
          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          public class ConsumerImpl implements Consumer
          {
           public Delivery accepts(Channel channel, Object message)
           {
           System.out.println(this + " accepts " + channel + " " + message);
           return new DeliveryImpl(channel, message)
           {
           public void deliver()
           {
           try
           {
           ConsumerImpl.this.deliver(message);
           ack();
           }
           catch (Throwable t)
           {
           nack();
           }
           }
           };
           }
          
           public void deliver(Object message) throws Throwable
           {
           System.out.println("Got message " + message);
           }
          }
          


          ConsumerChannel
          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          public class ConsumerChannel extends ChannelImpl implements Consumer
          {
           public Delivery accepts(Channel channel, Object message)
           {
           System.out.println(this + " accepts " + channel + " " + message);
           Delivery delivery = findConsumer(channel, message);
           if (delivery == null)
           delivery = new ConsumerChannelDelivery(channel, message);
           return delivery;
           }
          
           protected class ConsumerChannelDelivery extends DeliveryImpl
           {
           public ConsumerChannelDelivery(Channel channel, Object message)
           {
           super(channel, message);
           }
          
           public void deliver()
           {
           try
           {
           addMessage(message);
           ack();
           }
           catch (Throwable t)
           {
           nack();
           }
           }
           }
          }
          


          Delivery
          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          public interface Delivery
          {
           Object getMessage();
          
           void deliver();
          
           void ack() throws Throwable;
          
           void nack();
          }
          


          DeliveryImpl
          /*
           * JBoss, the OpenSource J2EE webOS
           *
           * Distributable under LGPL license.
           * See terms of license at gnu.org.
           */
          package test;
          
          public class DeliveryImpl implements Delivery
          {
           protected Channel channel;
           protected Object message;
          
           public DeliveryImpl(Channel channel, Object message)
           {
           this.channel = channel;
           this.message = message;
           }
          
           public Object getMessage()
           {
           return message;
           }
          
           public void deliver()
           {
           }
          
           public void ack() throws Throwable
           {
           System.out.println(this + " ack");
           channel.ack(this);
           }
          
           public void nack()
           {
           System.out.println(this + " nack");
           channel.nack(this);
           }
          }
          


          • 2. Re: The missing prototype...
            Adrian Brock Master

            Test code

            /*
             * JBoss, the OpenSource J2EE webOS
             *
             * Distributable under LGPL license.
             * See terms of license at gnu.org.
             */
            package test;
            
            public class Test
            {
             public static void main(String[] args) throws Throwable
             {
             System.setErr(System.out);
            
             ConsumerImpl consumer = new ConsumerImpl()
             {
             public void deliver(Object message) throws Throwable
             {
             super.deliver(message);
             if ("Message3".equals(message))
             throw new RuntimeException("nack");
             }
             };
             ConsumerChannel consumerChannel = new ConsumerChannel();
             consumerChannel.addConsumer(consumer);
             ChannelImpl channel = new ChannelImpl();
             channel.addConsumer(consumerChannel);
             channel.receiveConsumer(consumerChannel);
            
             channel.addMessage("Message1");
             consumerChannel.receiveConsumer(consumer);
             consumerChannel.receiveConsumer(consumer);
             channel.addMessage("Message2");
             channel.addMessage("Message3");
             consumerChannel.receiveConsumer(consumer);
             }
            }
            


            • 3. Re: The missing prototype...
              Adrian Brock Master

              Test output

              // Just some wiring together (deployment :-)
              test.ConsumerChannel@affc70 addConsumer test.Test$1@1e63e3d
              test.ChannelImpl@1004901 addConsumer test.ConsumerChannel@affc70
              test.ChannelImpl@1004901 receive test.ConsumerChannel@affc70 []
              
              // Add Message1 goes down the pipe
              test.ChannelImpl@1004901 addMessage Message1
              test.ChannelImpl@1004901 internalAddMessage Message1
              test.ChannelImpl@1004901 findConsumer test.ChannelImpl@1004901 Message1 [test.ConsumerChannel@affc70]
              test.ConsumerChannel@affc70 accepts test.ChannelImpl@1004901 Message1
              
              // No end consumer so we deliver it to the holding part of the pipe
              test.ConsumerChannel@affc70 findConsumer test.ChannelImpl@1004901 Message1 []
              test.ChannelImpl@1004901 doDelivery test.ConsumerChannel$ConsumerChannelDelivery@1a8c4e7
              test.ConsumerChannel@affc70 addMessage Message1
              test.ConsumerChannel@affc70 internalAddMessage Message1
              test.ConsumerChannel@affc70 findConsumer test.ConsumerChannel@affc70 Message1 []
              
              // And ack the message on the original channel
              test.ConsumerChannel$ConsumerChannelDelivery@1a8c4e7 ack
              test.ChannelImpl@1004901 ack test.ConsumerChannel$ConsumerChannelDelivery@1a8c4e7
              
              // Now we do the receive
              test.ConsumerChannel@affc70 receive test.Test$1@1e63e3d [Message1]
              test.Test$1@1e63e3d accepts test.ConsumerChannel@affc70 Message1
              test.ConsumerChannel@affc70 doDelivery test.ConsumerImpl$1@cf2c80
              Got message Message1
              
              // Which acks to the holding part of the pipe
              test.ConsumerImpl$1@cf2c80 ack
              test.ConsumerChannel@affc70 ack test.ConsumerImpl$1@cf2c80
              
              // Receive with not messages
              test.ConsumerChannel@affc70 receive test.Test$1@1e63e3d []
              
              // Now we add a message
              test.ChannelImpl@1004901 addMessage Message2
              test.ChannelImpl@1004901 internalAddMessage Message2
              test.ChannelImpl@1004901 findConsumer test.ChannelImpl@1004901 Message2 [test.ConsumerChannel@affc70]
              test.ConsumerChannel@affc70 accepts test.ChannelImpl@1004901 Message2
              test.ConsumerChannel@affc70 findConsumer test.ChannelImpl@1004901 Message2 [test.Test$1@1e63e3d]
              test.Test$1@1e63e3d accepts test.ChannelImpl@1004901 Message2
              
              // This can go straight through the pipe
              test.ChannelImpl@1004901 doDelivery test.ConsumerImpl$1@1729854
              Got message Message2
              test.ConsumerImpl$1@1729854 ack
              test.ChannelImpl@1004901 ack test.ConsumerImpl$1@1729854
              
              // We are going to NACK this, see the test code
              test.ChannelImpl@1004901 addMessage Message3
              test.ChannelImpl@1004901 internalAddMessage Message3
              test.ChannelImpl@1004901 findConsumer test.ChannelImpl@1004901 Message3 [test.ConsumerChannel@affc70]
              test.ConsumerChannel@affc70 accepts test.ChannelImpl@1004901 Message3
              test.ConsumerChannel@affc70 findConsumer test.ChannelImpl@1004901 Message3 []
              test.ChannelImpl@1004901 doDelivery test.ConsumerChannel$ConsumerChannelDelivery@6eb38a
              test.ConsumerChannel@affc70 addMessage Message3
              test.ConsumerChannel@affc70 internalAddMessage Message3
              test.ConsumerChannel@affc70 findConsumer test.ConsumerChannel@affc70 Message3 []
              test.ConsumerChannel$ConsumerChannelDelivery@6eb38a ack
              
              // In the holding part of the pipe
              test.ChannelImpl@1004901 ack test.ConsumerChannel$ConsumerChannelDelivery@6eb38a
              
              // Now we try to receive
              test.ConsumerChannel@affc70 receive test.Test$1@1e63e3d [Message3]
              test.Test$1@1e63e3d accepts test.ConsumerChannel@affc70 Message3
              test.ConsumerChannel@affc70 doDelivery test.ConsumerImpl$1@1cd2e5f
              Got message Message3
              
              // Got it but we nack
              test.ConsumerImpl$1@1cd2e5f nack
              test.ConsumerChannel@affc70 nack test.ConsumerImpl$1@1cd2e5f
              
              // It goes back in the holding part of the pipe
              test.ConsumerChannel@affc70 internalAddMessage Message3
              test.ConsumerChannel@affc70 findConsumer test.ConsumerChannel@affc70 Message3 []
              


              • 4. Re: The missing prototype...
                Adrian Brock Master

                Obviously, you don't have to use this idea, but I think it does resolve
                a lot of your race condition problems.
                For distributed work (or delivery with ack/nack across reboots),
                the "Delivery" will certainly need an global id to correctly identify it.
                For in memory work, it can obviously just use the Object indentity.