-
1. Re: scatter gather dynamic recipients
tadayosi May 7, 2015 12:33 AM (in response to mlybarger)Hi Mark,
It's an interesting topic to discuss. As you may already know, EIP's Scatter-Gather pattern (Enterprise Integration Patterns - Scatter-Gather) tells you that you can scatter a message using either Recipient List or Publish-Subscribe Channel (i.e. JMS topic), and gather them using Aggregator. I think it's quite self-explanatory.
Your initial thought is called the auction-style approach in the EIP book, which I think is still a sound approach. Why do you think it goes against topic convention? Another approach is the distribution approach using Recipient List. Note Camel Recipient List component can dynamically change the recipient destinations.
You can also aggregate the results using Camel Aggregator. When you apply Aggregator to your solution, keep in mind that there are three design points you still need to consider. Quoted from the EIP book:
- Correlation - which incoming messages belong together?
- Completeness Condition - when are we ready to publish the result message?
- Aggregator Algorithm - how do we combine the received messages into a single result message?
Hope this helps.
-
2. Re: scatter gather dynamic recipients
mlybarger May 7, 2015 12:58 PM (in response to tadayosi)Thanks for the response! It is self-explanatory and I do understand the pattern.
I mentioned that it goes against convention because, typically I don't expect a reply from a topic communication. "fire and forget".
I put together a simple quickstart type of switchyard example. The app looks like this. Producer inputs from a file (/tmp/test) and the CamelRoute sends the message to ProducerOut which is bound to a topic. EchoOne and EchoTwo are listening to the topic. I'm failing to see how to aggregate the results. In the camel route, all it gets is one message back, not multiple messages.
The code is here:
https://github.com/mlybarger/scattergather.git
And also attached. Any insights into how I can aggregate the results would be most helpful.
for me, I deploy the jar and then echo "test message" > /tmp/test and watch the results.
12:56:23,141 INFO [route14] (Camel (camel-30) thread #31 - file:///tmp) Received message for 'Producer' : testing
12:56:23,173 INFO [com.example.switchyard.scattergather.EchoTwoBean] (pool-53-thread-1) message() - start:testing
12:56:23,173 INFO [com.example.switchyard.scattergather.EchoOneBean] (pool-52-thread-1) message() - start:testing
12:56:23,189 WARN [org.apache.camel.component.jms.reply.TemporaryQueueReplyManager] (Camel (camel-30) thread #33 - TemporaryQueueReplyManager[Echo]) Reply received for unknown correlationID [Camel-ID-USL1XPZWZ1-55559-1431012520252-29-10]. The message will be ignored: ActiveMQTextMessage {commandId = 93, responseRequired = true, messageId = ID:USL1XPZWZ1-55586-1431012734185-1:1:8:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:USL1XPZWZ1-55586-1431012734185-1:1:8:1, destination = temp-queue://ID:USL1XPZWZ1-55586-1431012734185-7:1:1, transactionId = null, expiration = 0, timestamp = 1431017783189, arrival = 0, brokerInTime = 1431017783189, brokerOutTime = 1431017783189, correlationId = Camel-ID-USL1XPZWZ1-55559-1431012520252-29-10, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@420bd2e5, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = testing
}
12:56:23,189 INFO [route14] (Camel (camel-30) thread #31 - file:///tmp) final: testing
-
scattergather.zip 46.9 KB
-
-
3. Re: scatter gather dynamic recipients
tadayosi May 10, 2015 9:27 PM (in response to mlybarger)OK, then the next step would be to have both EchoOneBean and EchoTwoBean send the messages to the same queue (say, "Aggregate") via SY references, and in turn make an Aggregator SY service that listens to the "Aggregate" queue to collect the messages and use a Camel route with the Camel Aggregator to aggregate the messages.
You can also refer to the following doc to know the Camel way to implement Scatter-Gather pattern: