Loan Broker Example

This example shows how to use Camel to implement the EIP's loan broker example.

The example has two versions,one for JMS, one for webservice one.
The JMS version which leverages the message queue to connect the credit agency and bank loan quote processors together, it just uses the InOnly exchange pattern to handle the message asynchronously;
the webservice version which shows how to integrate the credit agency and bank web services together by using the InOut exchange pattern synchronously.

Implementation with message queue (JMS)

The queue version of loan broker is based on the camel-jms component, and it shows how to using the message queue to connect the different service models (such as the credit agency , and banks).

The example should run if you type

mvn exec:java -PQueue.LoanBroker

mvn exec:java -PQueue.Client

To stop the example hit ctrl + c

let's take a look how this service modules are put together.

// Put the message from loanRequestQueue to the creditRequestQueue
from("jms:queue:loanRequestQueue").to("jms:queue:creditRequestQueue");

// Now we can let the CreditAgency process the request, then the message will be put into creditResponseQueue
from("jms:queue:creditRequestQueue").process(new CreditAgency()).to("jms:queue:creditResponseQueue");

// Here we use the multicast pattern to send the message to three different bank queues
from("jms:queue:creditResponseQueue").multicast().to("jms:queue:bank1", "jms:queue:bank2", "jms:queue:bank3");

// Each bank processor will process the message and put the response message into the bankReplyQueue
from("jms:queue:bank1").process(new Bank("bank1")).to("jms:queue:bankReplyQueue");
from("jms:queue:bank2").process(new Bank("bank2")).to("jms:queue:bankReplyQueue");
from("jms:queue:bank3").process(new Bank("bank3")).to("jms:queue:bankReplyQueue");

// Now we aggregate the response message by using the Constants.PROPERTY_SSN header.
// The aggregation will be complete when all the three bank responses are received
// In Camel 2.0 the we use AGGERATED_SIZE instead of AGGERATED_COUNT as the header 
// name of the aggregated message size.
from("jms:queue:bankReplyQueue")
    .aggregate(header(Constants.PROPERTY_SSN), new BankResponseAggregationStrategy())
    .completionPredicate(header(Exchange.AGGREGATED_SIZE).isEqualTo(3))

// Here we do some translation and put the message back to loanReplyQueue
    .process(new Translator()).to("jms:queue:loanReplyQueue");

The CreditAgency , Bank and Translator are all the implementation of Processor interface. We implement the business logical in the void process(Exchange exchange) method.

CreditAgency

public class CreditAgency implements Processor {
    private static final transient Log LOG = LogFactory.getLog(CreditAgency.class);

    public void process(Exchange exchange) throws Exception {
        LOG.info("Receiving credit agency request");
        String ssn = exchange.getIn().getHeader(Constants.PROPERTY_SSN, String.class);
        int score = (int) (Math.random() * 600 + 300);
        int hlength = (int) (Math.random() * 19 + 1);
        exchange.getOut().setHeader(Constants.PROPERTY_SCORE, new Integer(score));
        exchange.getOut().setHeader(Constants.PROPERTY_HISTORYLENGTH, new Integer(hlength));
        exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn);
        exchange.getOut().setBody("CreditAgency processed the request.");
    }

}

Bank

public class Bank implements Processor {
    private static final transient Log LOG = LogFactory.getLog(Bank.class);
    private String bankName;
    private double primeRate;

    public Bank(String name) {
        bankName = name;
        primeRate = 3.5;
    }

    public void process(Exchange exchange) throws Exception {
        String ssn = exchange.getIn().getHeader(Constants.PROPERTY_SSN, String.class);
        Integer historyLength = exchange.getIn().getHeader(Constants.PROPERTY_HISTORYLENGTH, Integer.class);
        double rate = primeRate + (double)(historyLength / 12) / 10 + (double)(Math.random() * 10) / 10;
        LOG.info("The bank: " + bankName + " for client: " + ssn + " 's rate " + rate);
        exchange.getOut().setHeader(Constants.PROPERTY_RATE, new Double(rate));
        exchange.getOut().setHeader(Constants.PROPERTY_BANK, bankName);
        exchange.getOut().setHeader(Constants.PROPERTY_SSN, ssn);
        exchange.getOut().setBody("Bank processed the request.");
        // Sleep some time
        try {
            Thread.sleep((int) (Math.random() * 10) * 100);
        } catch (InterruptedException e) {
            // Discard
        }
    }

}

Translator

public class Translator implements Processor {

    public void process(Exchange exchange) throws Exception {
        String bank = (String)exchange.getIn().getHeader(Constants.PROPERTY_BANK);
        Double rate = (Double)exchange.getIn().getHeader(Constants.PROPERTY_RATE);
        String ssn = (String)exchange.getIn().getHeader(Constants.PROPERTY_SSN);
        exchange.getOut().setBody("Loan quotion for Client " + ssn + "."
                                  + " The lowest rate bank is " + bank + ", the rate is " + rate);
    }

}

You may found we set a custom aggregation strategy to find out the lowest loan rate from bank response message.

public class BankResponseAggregationStrategy implements AggregationStrategy {    
    private static final transient Log LOG = LogFactory.getLog(BankResponseAggregationStrategy.class);
    private boolean aggregatingOutMessage;
    
    public BankResponseAggregationStrategy setAggregatingOutMessage(boolean flag) {
        aggregatingOutMessage = flag;
        return this;
    }
    
    // Here we put the bank response together
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        LOG.debug("Get the exchange to aggregate, older: " + oldExchange + " newer:" + newExchange);

        // the first time we only have the new exchange
        if (oldExchange == null) {
            return newExchange;
        }

        Message oldMessage;
        Message newMessage;
       
        oldMessage = oldExchange.getIn();
        newMessage = newExchange.getIn();

        Double oldRate = oldMessage.getHeader(Constants.PROPERTY_RATE, Double.class);
        Double newRate = newMessage.getHeader(Constants.PROPERTY_RATE, Double.class);

        Exchange result;
        if (newRate >= oldRate) {
            result = oldExchange;
        } else {
            result = newExchange;
        }

        LOG.debug("Get the lower rate exchange " + result);
        return result;
    }

}

We start the loan broker after we start up the ActiveMq broker and the connection factory of Camel-JMS component.

public static void main(String... args) throws Exception {

    CamelContext context = new DefaultCamelContext();
    JmsBroker broker = new JmsBroker();
    broker.start();
    // Set up the ActiveMQ JMS Components
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:51616");

    // Note we can explicitly name the component
    context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

    context.addRoutes(new LoanBroker());
    // Start the loan broker
    context.start();
    System.out.println("Server is ready");

    Thread.sleep(5 * 60 * 1000);
    context.stop();
    Thread.sleep(1000);
    broker.stop();

}

Now we can send the request from client and pull the response message back

public class Client extends RouteBuilder {

    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();
        // Set up the ActiveMQ JMS Components
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:51616");
        // Note we can explicit name of the component
        context.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

        context.addRoutes(new Client());

        ProducerTemplate template = context.createProducerTemplate();

        context.start();

        // send out the request message
        for (int i = 0; i < 2; i++) {
            template.sendBodyAndHeader("jms:queue:loanRequestQueue",
                                       "Quote for the lowerst rate of loaning bank",
                                       Constants.PROPERTY_SSN, "Client-A" + i);
            Thread.sleep(100);
        }
        // wait for the response
        Thread.sleep(2000);
        
        // send the request and get the response from the same queue       
        Exchange exchange = template.send("jms:queue2:parallelLoanRequestQueue", new Processor() {
            public void process(Exchange exchange) throws Exception {
                exchange.setPattern(ExchangePattern.InOut);
                exchange.getIn().setBody("Quote for the lowerst rate of loaning bank");
                exchange.getIn().setHeader(Constants.PROPERTY_SSN, "Client-B");
            }
        });
        
        String bank = (String)exchange.getOut().getHeader(Constants.PROPERTY_BANK);
        Double rate = (Double)exchange.getOut().getHeader(Constants.PROPERTY_RATE);
        String ssn = (String)exchange.getOut().getHeader(Constants.PROPERTY_SSN);
        System.out.println("Loan quotion for Client " + ssn + "."
                           + " The lowest rate bank is " + bank + ", the rate is " + rate);
        
        // Wait a while before stop the context
        Thread.sleep(1000 * 5);
        context.stop();

    }

    /**
     * Lets configure the Camel routing rules using Java code to pull the response message
     */
    public void configure() {

        from("jms:queue:loanReplyQueue").process(new Processor() {

            public void process(Exchange exchange) throws Exception {
                // Print out the response message
                System.out.println(exchange.getIn().getBody());

            }

        });

    }

}

Implementation with web service

The web service version of loan broker is based on the camel-cxf component which can produce and consume the SOAP message on the wire. It uses the InOut Message exchange pattern, when the client send out the message to the router , it can get the response message back from the same endpoint.
When we send out the quote message to the three different banks, we could choice to call the bank service one by one or send out the message parallelly(one request thread per request).
You can compare the response time after you run the sample.

The example should run if you type

mvn exec:java -PWS.LoanBroker

mvn exec:java -PWS.Client

To stop the example hit ctrl + c

First, let's go through the SEI (Service Endpoint Interface) for LoanBroker, CreditAgency and Bank.

LoanBroker

// This SEI has no @WebService annotation, we use the simple frontend API to create client and server
public interface LoanBrokerWS {

    String getLoanQuote(String ssn, Double loanAmount, Integer loanDuriation);

}

CreditAgency

@WebService
public interface CreditAgencyWS {

    int getCreditScore(String ssn);

    int getCreditHistoryLength(String ssn);

}

Bank

// Since we use @WebServices here, please make sure to use JaxWs frontend API create the client and server
@WebService
public interface BankWS {

    String getBankName();


    BankQuote getQuote(String ssn, double loanAmount, int loanDuration, int creditHistory, int creditScore);

}

Here are two routing rules in DSL , one is for routing the request to bank sequentially, the other is for calling the bank service parallely.

// Router 1 to call the bank endpoints sequentially
from(Constants.LOANBROKER_URI)
    // Using the CreditScoreProcessor to call the credit agency service
    .process(new CreditScoreProcessor(Constants.CREDITAGENCY_ADDRESS))
        // Set the aggregation strategy on the multicast pattern
        .multicast(new BankResponseAggregationStrategy())
            // Send out the request to three different banks sequentially
            .to(Constants.BANK1_URI, Constants.BANK2_URI, Constants.BANK3_URI);

// Router 2 to call the bank endpoints in parallel
from(Constants.PARALLEL_LOANBROKER_URI)
    .process(new CreditScoreProcessor(Constants.CREDITAGENCY_ADDRESS))
        // Using the thread pool to send out messages to three different banks in parallel                
        .multicast(new BankResponseAggregationStrategy())
            // Camel will create a thread pool with the default size (10) 
            // for sending the message in parallel
            .parallelProcessing()
            .to(Constants.BANK1_URI, Constants.BANK2_URI, Constants.BANK3_URI);

We use the CreditScoreProcessor to send two request to credit agency to get the credit history length and the credit score and prepare the request message for the bank.

class CreditScoreProcessor implements Processor {
    private String creditAgencyAddress;
    private CreditAgencyWS proxy;

    public CreditScoreProcessor(String address) {
        creditAgencyAddress = address;
        proxy = getProxy();
    }

    private CreditAgencyWS getProxy() {
        // Here we use JaxWs front end to create the proxy
        JaxWsProxyFactoryBean proxyFactory = new JaxWsProxyFactoryBean();
        ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean();
        clientBean.setAddress(creditAgencyAddress);
        clientBean.setServiceClass(CreditAgencyWS.class);
        clientBean.setBus(BusFactory.getDefaultBus());
        return (CreditAgencyWS)proxyFactory.create();
    }

    @SuppressWarnings("unchecked")
    public void process(Exchange exchange) throws Exception {
        Message requestMessage = exchange.getIn();
        List<Object> request = (List<Object>) requestMessage.getBody();

        String ssn = (String)request.get(0);
        Double amount = (Double) request.get(1);
        Integer loanDuriation = (Integer)request.get(2);
        int historyLength = proxy.getCreditHistoryLength(ssn);
        int score = proxy.getCreditScore(ssn);

        // create the invocation message for Bank client
        List<Object> bankRequest = new ArrayList<Object>();
        bankRequest.add(ssn);
        bankRequest.add(amount);
        bankRequest.add(loanDuriation);
        bankRequest.add(historyLength);
        bankRequest.add(score);
        exchange.getOut().setBody(bankRequest);
        exchange.getOut().setHeader(CxfConstants.OPERATION_NAME, "getQuote");
    }

}

Now we implement the Bank and CreditAgency SEI with the business logical codes.

Bank

public class Bank implements BankWS {
    private String bankName;
    private double primeRate;

    public Bank(String name) {
        bankName = name;
        primeRate = 3.5;
    }

    public String getBankName() {
        return bankName;
    }

    public BankQuote getQuote(String ssn, double loanAmount, int loanDuration, int creditHistory, int creditScore) {
        Double rate = primeRate + (double)(loanDuration / 12) / 10 + (double)(Math.random() * 10) / 10;
        // Wait for a while
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // do nothing here
        }
        BankQuote result = new BankQuote(bankName, ssn, rate);
        return result;
    }

}

CreditAgency

public class CreditAgency implements CreditAgencyWS {

    public int getCreditHistoryLength(String ssn) {
        int creditScore = (int)(Math.random() * 600 + 300);
        return creditScore;
    }

    public int getCreditScore(String ssn) {
        int creditHistoryLength = (int)(Math.random() * 19 + 1);
        return creditHistoryLength;
    }

}

The below codes show how the start the loan broker.

public static void main(String... args) throws Exception {
    CamelContext context = new DefaultCamelContext();
    CreditAgencyServer creditAgencyServer = new CreditAgencyServer();
    // Start the credit server
    creditAgencyServer.start();

    // Start the bank server
    BankServer bankServer = new BankServer();
    bankServer.start();

    // Start the camel context
    context.addRoutes(new LoanBroker());
    context.start();

    // Start the loan broker
    Thread.sleep(5 * 60 * 1000);
    context.stop();
    Thread.sleep(1000);
    bankServer.stop();
    creditAgencyServer.stop();
}

We can send the request by creating a client proxy with the LoanBroker SEI in the client code. BTW, you can compare the two different routing rule's performance by running the client.

public class Client {

    public LoanBrokerWS getProxy(String address) {
        // Now we use the simple front API to create the client proxy
        ClientProxyFactoryBean proxyFactory = new ClientProxyFactoryBean();
        ClientFactoryBean clientBean = proxyFactory.getClientFactoryBean();
        clientBean.setAddress(address);
        clientBean.setServiceClass(LoanBrokerWS.class);
        clientBean.setBus(BusFactory.getDefaultBus());
        return (LoanBrokerWS) proxyFactory.create();
    }

    public static void main(String[] args) {
        Client client = new Client();
        String result = null;
        LoanBrokerWS loanBroker = client.getProxy(Constants.LOANBROKER_ADDRESS);
        long startTime = System.currentTimeMillis();
        result = loanBroker.getLoanQuote("Sequential SSN", 1000.54, 10);
        long endTime = System.currentTimeMillis();
        System.out.println("It takes " + (endTime - startTime) + " milliseconds to call the sequential loan broker service");
        System.out.println(result);

        LoanBrokerWS paralleLoanBroker = client.getProxy(Constants.PARALLEL_LOANBROKER_ADDRESS);
        startTime = System.currentTimeMillis();
        result = paralleLoanBroker.getLoanQuote("Parallel SSN", 1000.54, 10);
        endTime = System.currentTimeMillis();
        System.out.println("It takes " + (endTime - startTime) + " milliseconds to call the parallel loan broker service");
        System.out.println(result);
    }

}
Graphic Design By Hiram