With a better understanding of the processing options available in Mule after reading Mule in Action, I found a place where some of the routing and aggregation examples would be of help in a current project. I came up with a prototype that models the messaging flow I was interested in before proceeding with work on my real-world project.
The processing model I wanted to prototype involved:
- A message is received on a inbound endpoint.
- The message is transformed and forwarded to each of two services.
- The responses of each service is collected and used to create a request for the business logic.
- The business logic is invoked and the response returned to the caller.
In my real-world project, I would of course need to consider the transformation of messages to meet the expected request formats by each service and the transports, JMS in my case, by which the services are invoked. In order to keep this example manageable, I decided to use simple components and the VM transport. This means the stand-ins for the services I want to invoke are defined in the same model as the service handling the business concerns.
In this example, I use the stock log-component element which provides an implementation of org.mule.api.component.simple.LogService. This component simply logs what is received on the inbound endpoint. If no component is declared, the inbound message is implicitly bridged to the outbound definition. This approach would fit just fine here for some of the services defined in the model, but I want to log the messages so we can more easily track the message flow.
Originally this example used echo-components, hence the use of "echo" instead of "log" throughout, but all log-components could be substitued with an echo-component to produce the same results with respect to the final response message.
Below is one possible implementation of the logical layout described above:
- A file is read of the file endpoint, directory /tmp/input.
- The contents of the file is logged and are sent to an outbound chaining router.
- The chaining router first routes to the echo-router service by a VM endpoint.
- The request is multi-cast to the echo-one and echo-two services.
- Each response is sent to the echo-response endpoint where there are aggregated and returned to the chaining router.
- The chaining router forwards the aggreagated response to the main-logic endpoint where the business logic is invoked.
At the start of processing a message is received by the echo-start service:
<service name="echo-start">
<inbound>
<file:inbound-endpoint path="/tmp/input"/>
</inbound>
<log-component/>
<outbound>
<chaining-router>
<vm:outbound-endpoint path="echo-requests"/>
<vm:outbound-endpoint path="main-request"/>
</chaining-router>
</outbound>
</service>
The file contents are first sent to the echo-requests endpoint. The goal here is to have the service listening to the endpoint invoke the other two response and send the aggreage response back, where the chaining router would forward it to the main-request endpoint where the business logic of the real-world service would accept it.
The echo-router service receives the request on the echo-requests endpoint, where the request is sent to the endpoint for each service we want to invoke:
<outbound>
<multicasting-router enableCorrelation="ALWAYS">
<vm:outbound-endpoint address="vm://echo-one-request"/>
<vm:outbound-endpoint address="vm://echo-two-request"/>
<reply-to address="vm://echo-responses"/>
</multicasting-router>
</outbound>
As the target services are invoked asynchronously, the enableCorrelation attribute is needed so any response can be related to the request. Additionally, we need to tell the router where to send the responses from each service with the reply-to element.
The responses for the "echo" services (the implemenations of which will be provided later) as sent to the echo-responses endpoint where a custom aggregator is listening:
<async-reply>
<vm:inbound-endpoint path="echo-responses"/>
<custom-async-reply-router class="prystasj.mule.EchoResponseAggregator"/>
</async-reply>
The response from the aggregator is given back to the chaining-router and forwarded to the main-request endpoint where it is simply logged. Below is the entire model. Since it's rather lengthy, I promise to descibe the custom aggregator and an example invocation soon after.
<model name="echoAggregationModel">
<service name="echo-start">
<inbound>
<file:inbound-endpoint path="/tmp/input" />
</inbound>
<log-component />
<outbound>
<chaining-router>
<vm:outbound-endpoint path="echo-requests" />
<vm:outbound-endpoint path="main-request" />
</chaining-router>
</outbound>
</service>
<service name="echo-router">
<inbound>
<vm:inbound-endpoint path="echo-requests" />
</inbound>
<log-component />
<outbound>
<multicasting-router enableCorrelation="ALWAYS">
<vm:outbound-endpoint address="vm://echo-one-request" />
<vm:outbound-endpoint address="vm://echo-two-request" />
<reply-to address="vm://echo-responses" />
</multicasting-router>
</outbound>
<async-reply>
<vm:inbound-endpoint path="echo-responses" />
<custom-async-reply-router class="prystasj.mule.EchoResponseAggregator" />
</async-reply>
</service>
<service name="main-logic">
<inbound>
<vm:inbound-endpoint path="main-request" />
</inbound>
<log-component />
</service>
<service name="echo-one">
<inbound>
<vm:inbound-endpoint address="vm://echo-one-request" />
</inbound>
<log-component />
</service>
<service name="echo-two">
<inbound>
<vm:inbound-endpoint address="vm://echo-two-request" />
</inbound>
<log-component />
</service>
</model>
The custom aggregator simply concatenates the responses it sees. If the original request file contained "HELLO", the response from the aggregator would be "HELLOHELLO" since the responses from the service that are invoked are simply echoes of what they've received.
The custom aggregator extends the org.mule.routing.response.ResponseCorrelationAggregator class since correlation was enabled to map the responses from the echo services to the originating request.
Here is the code for the aggregator, for brevity, I've ommitted any guards and checks:
public class EchoResponseAggregator extends ResponseCorrelationAggregator {
@Override
protected EventCorrelatorCallback getCorrelatorCallback() {
return new CollectionCorrelatorCallback() {
public MuleMessage aggregateEvents(EventGroup events) throws AggregationException {
try {
return aggregateResponsesFromEvents(events);
} catch(Exception e) {
throw new AggregationException(events, null ,e);
}
}
};
}
public static MuleMessage aggregateResponsesFromEvents(EventGroup events) throws Exception {
String aggregateResponse = "";
MuleEvent event = null;
for (Iterator iterator = events.iterator(); iterator.hasNext();) {
event = (MuleEvent)iterator.next();
String payload = event.getMessage().getPayloadAsString();
aggregateResponse += payload;
}
return new DefaultMuleMessage(aggregateResponse, event.getMessage());
}
}
Hope this example helps anyone looking to do something similar. Any suggestions on how to simplify things would also be appreciated. Thanks!.
I have an issue and was hopping you can help. I defined multicasting-router like so:
ReplyDeleteboth outbound-endpoints are executed twice for some reason. If I comment one out or make one async. they work as expected.
Unfortunately I can't see your config (was the XML escaped in your comment?), but I don't think multi-casting routers are to be used when you're expecting a response, so setting synchronous to true might not be the right approach here. Hence, multi-casting routers requiring an async-reply when a response is expected.
ReplyDeleteThe router is likely receiving the responses from the endpoints synchronously and routing them again.
Might I recommend you check out the models here and see which one yours fits: http://www.infoq.com/articles/mule-message-routing
I sent you an email with config.
ReplyDeleteI haven't seen an email (could very well have simply missed it), but I was thinking if the config was listed here it might be more likely to help others in a similar situation.
ReplyDeleteHello,
ReplyDeleteWe are trying to call a service running on 2 servers, aggregate the response. We are unable to to the aggregation , any help is much appreciated.
Here is the config file
The error we get is
org.mule.api.routing.ResponseTimeoutException: Response timed out (10000ms) waiting for message response id "bfd612ac-3849-11df-87f5-c583c39e24b1" or this action was interrupted. Failed to route event via endpoint: null. Message payload is of type: NullPayload
at org.mule.routing.EventCorrelator.getResponse(EventCorrelator.java:500)
at org.mule.routing.EventCorrelator.getResponse(EventCorrelator.java:403)
at org.mule.routing.response.AbstractResponseAggregator.getResponse(serviceOverrides=null}, transformer=[], name='endpoint.vm.service.request', properties={}, transactionConfig=Transaction{factory=null, action=NEVER, timeout=0}, filter=null, deleteUnacceptedMessages=false, securityFilter=null, synchronous=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8}
Hi is it possible to modify to have a finite volume aggregator. An aggregator that just aggregates says 2000 messages and sends them.
ReplyDeleteHello, you should be able to override the EventCorrelatorCallback.createEventGroup(MuleEvent event, Object id) method where you can set the expectedSize property on the EventGroup.
ReplyDeleteHope this helps a bit.