This content has been marked as final.
Show 4 replies
-
1. Re: The missing prototype...
adrian.brock Jun 10, 2005 7:33 PM (in response to adrian.brock)Channel
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public interface Channel { void addMessage(Object message) throws Throwable; void addConsumer(Consumer consumer) throws Throwable; void removeConsumer(Consumer consumer) throws Throwable; void receiveConsumer(Consumer consumer) throws Throwable; void ack(Delivery delivery) throws Throwable; void nack(Delivery delivery); }
ChannelImpl/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; public class ChannelImpl implements Channel { private ArrayList messages = new ArrayList(); private HashSet<Consumer> consumers = new HashSet<Consumer>(); private HashSet<Consumer> waitingConsumers = new HashSet<Consumer>(); public void addMessage(Object message) throws Throwable { System.out.println(this + " addMessage " + message); internalAddMessage(message); } public void addConsumer(Consumer consumer) { System.out.println(this + " addConsumer " + consumer); consumers.add(consumer); } public void removeConsumer(Consumer consumer) { System.out.println(this + " removeConsumer " + consumer); consumers.remove(consumer); } public void receiveConsumer(Consumer consumer) throws Throwable { System.out.println(this + " receive " + consumer + " " + messages); if (consumers.contains(consumer) == false) throw new Exception("Not a registered consumer " + consumers); Delivery delivery = null; for (Iterator i = messages.iterator(); i.hasNext();) { Object message = i.next(); delivery = consumer.accepts(this, message); if (delivery != null) { i.remove(); break; } } if (delivery != null) doDelivery(delivery); else waitingConsumers.add(consumer); } public void ack(Delivery delivery) throws Throwable { System.out.println(this + " ack " + delivery); } public void nack(Delivery delivery) { System.out.println(this + " nack " + delivery); internalAddMessage(delivery.getMessage()); } protected void internalAddMessage(Object message) { System.out.println(this + " internalAddMessage " + message); Delivery delivery = findConsumer(this, message); if (delivery != null) doDelivery(delivery); else messages.add(message); } protected void doDelivery(Delivery delivery) { System.out.println(this + " doDelivery " + delivery); try { delivery.deliver(); } catch (Throwable t) { delivery.nack(); } } protected Delivery findConsumer(Channel channel, Object message) { System.out.println(this + " findConsumer " + channel + " " + message + " " + waitingConsumers); for (Iterator<Consumer> i = waitingConsumers.iterator(); i.hasNext(); ) { Consumer consumer = i.next(); Delivery delivery = consumer.accepts(channel, message); if (delivery != null) { if (consumer instanceof ConsumerChannel == false) waitingConsumers.remove(consumer); return delivery; } } return null; } }
Consumer/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public interface Consumer { Delivery accepts(Channel channel, Object message); }
ConsumerImpl/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public class ConsumerImpl implements Consumer { public Delivery accepts(Channel channel, Object message) { System.out.println(this + " accepts " + channel + " " + message); return new DeliveryImpl(channel, message) { public void deliver() { try { ConsumerImpl.this.deliver(message); ack(); } catch (Throwable t) { nack(); } } }; } public void deliver(Object message) throws Throwable { System.out.println("Got message " + message); } }
ConsumerChannel/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public class ConsumerChannel extends ChannelImpl implements Consumer { public Delivery accepts(Channel channel, Object message) { System.out.println(this + " accepts " + channel + " " + message); Delivery delivery = findConsumer(channel, message); if (delivery == null) delivery = new ConsumerChannelDelivery(channel, message); return delivery; } protected class ConsumerChannelDelivery extends DeliveryImpl { public ConsumerChannelDelivery(Channel channel, Object message) { super(channel, message); } public void deliver() { try { addMessage(message); ack(); } catch (Throwable t) { nack(); } } } }
Delivery/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public interface Delivery { Object getMessage(); void deliver(); void ack() throws Throwable; void nack(); }
DeliveryImpl/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public class DeliveryImpl implements Delivery { protected Channel channel; protected Object message; public DeliveryImpl(Channel channel, Object message) { this.channel = channel; this.message = message; } public Object getMessage() { return message; } public void deliver() { } public void ack() throws Throwable { System.out.println(this + " ack"); channel.ack(this); } public void nack() { System.out.println(this + " nack"); channel.nack(this); } }
-
2. Re: The missing prototype...
adrian.brock Jun 10, 2005 7:34 PM (in response to adrian.brock)Test code
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package test; public class Test { public static void main(String[] args) throws Throwable { System.setErr(System.out); ConsumerImpl consumer = new ConsumerImpl() { public void deliver(Object message) throws Throwable { super.deliver(message); if ("Message3".equals(message)) throw new RuntimeException("nack"); } }; ConsumerChannel consumerChannel = new ConsumerChannel(); consumerChannel.addConsumer(consumer); ChannelImpl channel = new ChannelImpl(); channel.addConsumer(consumerChannel); channel.receiveConsumer(consumerChannel); channel.addMessage("Message1"); consumerChannel.receiveConsumer(consumer); consumerChannel.receiveConsumer(consumer); channel.addMessage("Message2"); channel.addMessage("Message3"); consumerChannel.receiveConsumer(consumer); } }
-
3. Re: The missing prototype...
adrian.brock Jun 10, 2005 7:40 PM (in response to adrian.brock)Test output
// Just some wiring together (deployment :-) test.ConsumerChannel@affc70 addConsumer test.Test$1@1e63e3d test.ChannelImpl@1004901 addConsumer test.ConsumerChannel@affc70 test.ChannelImpl@1004901 receive test.ConsumerChannel@affc70 [] // Add Message1 goes down the pipe test.ChannelImpl@1004901 addMessage Message1 test.ChannelImpl@1004901 internalAddMessage Message1 test.ChannelImpl@1004901 findConsumer test.ChannelImpl@1004901 Message1 [test.ConsumerChannel@affc70] test.ConsumerChannel@affc70 accepts test.ChannelImpl@1004901 Message1 // No end consumer so we deliver it to the holding part of the pipe test.ConsumerChannel@affc70 findConsumer test.ChannelImpl@1004901 Message1 [] test.ChannelImpl@1004901 doDelivery test.ConsumerChannel$ConsumerChannelDelivery@1a8c4e7 test.ConsumerChannel@affc70 addMessage Message1 test.ConsumerChannel@affc70 internalAddMessage Message1 test.ConsumerChannel@affc70 findConsumer test.ConsumerChannel@affc70 Message1 [] // And ack the message on the original channel test.ConsumerChannel$ConsumerChannelDelivery@1a8c4e7 ack test.ChannelImpl@1004901 ack test.ConsumerChannel$ConsumerChannelDelivery@1a8c4e7 // Now we do the receive test.ConsumerChannel@affc70 receive test.Test$1@1e63e3d [Message1] test.Test$1@1e63e3d accepts test.ConsumerChannel@affc70 Message1 test.ConsumerChannel@affc70 doDelivery test.ConsumerImpl$1@cf2c80 Got message Message1 // Which acks to the holding part of the pipe test.ConsumerImpl$1@cf2c80 ack test.ConsumerChannel@affc70 ack test.ConsumerImpl$1@cf2c80 // Receive with not messages test.ConsumerChannel@affc70 receive test.Test$1@1e63e3d [] // Now we add a message test.ChannelImpl@1004901 addMessage Message2 test.ChannelImpl@1004901 internalAddMessage Message2 test.ChannelImpl@1004901 findConsumer test.ChannelImpl@1004901 Message2 [test.ConsumerChannel@affc70] test.ConsumerChannel@affc70 accepts test.ChannelImpl@1004901 Message2 test.ConsumerChannel@affc70 findConsumer test.ChannelImpl@1004901 Message2 [test.Test$1@1e63e3d] test.Test$1@1e63e3d accepts test.ChannelImpl@1004901 Message2 // This can go straight through the pipe test.ChannelImpl@1004901 doDelivery test.ConsumerImpl$1@1729854 Got message Message2 test.ConsumerImpl$1@1729854 ack test.ChannelImpl@1004901 ack test.ConsumerImpl$1@1729854 // We are going to NACK this, see the test code test.ChannelImpl@1004901 addMessage Message3 test.ChannelImpl@1004901 internalAddMessage Message3 test.ChannelImpl@1004901 findConsumer test.ChannelImpl@1004901 Message3 [test.ConsumerChannel@affc70] test.ConsumerChannel@affc70 accepts test.ChannelImpl@1004901 Message3 test.ConsumerChannel@affc70 findConsumer test.ChannelImpl@1004901 Message3 [] test.ChannelImpl@1004901 doDelivery test.ConsumerChannel$ConsumerChannelDelivery@6eb38a test.ConsumerChannel@affc70 addMessage Message3 test.ConsumerChannel@affc70 internalAddMessage Message3 test.ConsumerChannel@affc70 findConsumer test.ConsumerChannel@affc70 Message3 [] test.ConsumerChannel$ConsumerChannelDelivery@6eb38a ack // In the holding part of the pipe test.ChannelImpl@1004901 ack test.ConsumerChannel$ConsumerChannelDelivery@6eb38a // Now we try to receive test.ConsumerChannel@affc70 receive test.Test$1@1e63e3d [Message3] test.Test$1@1e63e3d accepts test.ConsumerChannel@affc70 Message3 test.ConsumerChannel@affc70 doDelivery test.ConsumerImpl$1@1cd2e5f Got message Message3 // Got it but we nack test.ConsumerImpl$1@1cd2e5f nack test.ConsumerChannel@affc70 nack test.ConsumerImpl$1@1cd2e5f // It goes back in the holding part of the pipe test.ConsumerChannel@affc70 internalAddMessage Message3 test.ConsumerChannel@affc70 findConsumer test.ConsumerChannel@affc70 Message3 []
-
4. Re: The missing prototype...
adrian.brock Jun 10, 2005 7:45 PM (in response to adrian.brock)Obviously, you don't have to use this idea, but I think it does resolve
a lot of your race condition problems.
For distributed work (or delivery with ack/nack across reboots),
the "Delivery" will certainly need an global id to correctly identify it.
For in memory work, it can obviously just use the Object indentity.