I wanted to start a discussion about how to implement a distributed queue backed by a cache, like Infinispan. I couldn't find your dev forum, so posting here, feel free to move this thread to the appropriate place.
This is a good place to start. The infinispan-dev mail list is used for developing Infinispan itself.
A queue is inherently ordered by a map isn't. Here are my initial thoughts on this:
For each queue, the cache could maintain:
1) A well known key (probably based on queue name) whose value is the id (key) of the element currently at the head of the queue. This would be used when retrieving the element from the head.
2) A well known key (probably based on queue name) whose value is the id (key) of the element currently at the tail of the queue. This would be used when appending an element to the tail of the queue.
3) Each queue element would also have to maintain an id for the next element in the queue. Basically it's a linked list.
When retrieving the head of the queue, or appending to the tail of the queue we would need some kind of locking to ensure this was done atomically and the new head/tail pointers were updated consistently.
I guess some kind of optimistic locking scheme based on do and retry (kind of like compare and set) would scale better, but this is the area I am less sure about.
This can be done, and this does sound like an interesting approach, except that there will be a lot of write contention on the HEAD and TAIL "pointer-entries". This could be eased by actually maintaining 2 separate caches (multiple caches from the same cache manager share the same transport, etc. and so are lightweight).
One cache could be configured to be replicated, to contain the HEAD and TAIL pointers as these would be needed everywhere. The second cache would be in distributed mode and would contain the entries in the queue. The latter would be the bit that needs to scale really well since this is what would grow. The former just needs to deal with frequent write contention.
Caches are XA compliant so wrapping the operation in a tx would help with atomicity: e.g., adding an entry to the queue:
1. Start tx 2. Acquire eager write lock on TAIL pointer entry 3. Add entry to queue 4. Retrieve last_entry 5. Update last_entry.next 6. Update TAIL pointer 7. Commit tx
This still doesn't remove the write contention on the TAIL pointer (and HEAD pointer) though, and you could see this become the bottleneck in the system.
I would be very interested in something like this; I've been recently exploring solutions for the Lucene Directory: we have now a distributed store using Infinispan for the current index state, but updating the index needs to be done on a single node; So current solution is ok to distribute the index but won't work well in case of multiple writers in high load: high contention makes Lucene break as it can't manage a proper spinning on the lock.
What we do in Hibernate Search is send update statements to a queue and I was going to play with HornetQ hoping to involve Tim.. but I'm still in a knot of ideas and requirements.
In case of Lucene, I don't really need a publish-subscribe queue but it's more like an HA consumer, we need to guarantee there always is a consumer taking work from the queue and updating the index, and absolutely not more than one at the same time.
One very important point is the work on the queue must guarantee orderd: it's obsiously different to "add an entity and then remove" than the other way around.
Sorry for going a bit OT, I'll open my own thread when I'll have cleared my mind - just keep us updated on this.
I'm curious to understand your requirements for distributed queueing. We have a similar requirement but we only need eventual queueing support. We don't need true queueing support since we use the queue for job queueing but the queues only need to be FIFO per server since we route all realted info to the same server.
We've been kicking around working with the Infinspan developers to get native queueing support built into Infinispan. True distributed queueing across the grid seems pretty tough to get it to perform.
I think a simple implementation would be to be to simply put you queue data stucture into the desired cache. Then register event listeners to be with the cache that listen for changes to the cache entry that holds your queue data structure.