How-To : Using Coherence Transactions and Continious Queries- Order Booking Demonstration

Date: 5/01/07
Author: Adam Leftik

Introduction

This example application demonstrates using Coherence to scale your application. This sample uses Coherence to demonstrate the ability to create unlimited data grids, massive parallel processing, and fault tolerence . The Trade data is loaded via a CacheStore and indexed. The Web application is used to view the aggregated trade information that is calculated in parallel using multiple cache servers. As multiple cache servers are started, note that query times will drop in proportion to the additional hardware, further, loss of a cache server does not result in user down time. The sample application performs three calculations: finds all the distinct stock symbols that have been traded, counts the number of shares traded for each symbol, and finally aggregates all the trades in the market.

Coherence provides a transaction cache store capability which brings atomicity to the data grid infrastructure. A sample servlet is included that demonstrates Java Transaction API (JTA) integration through the Java Connection Architecture (JCA) adapter provided with Coherence. Finally, Continous Queries support Event Driven Architectures by providing up-to-date query results without the overhead and semantics of reissuing queries.

Aggregating Data Using Coherence Invocable Maps

 

The code snippet shows the aggregation of a cached data using invocable maps.

 
  NamedCache cache = CacheFactory.getCache("demo"); 
    //find all the distinct stock symbols that have been traded...this will be done in parallel
  Set setDistinct = (Set) cache.aggregate( (Filter) null, new DistinctValues("getSymbol")); 
  int iTotal = 0; 
   //aggregate the amount traded for each  symbol...this will be done in parallel
  for (Iterator iter = setDistinct.iterator(); iter.hasNext();) {
      String sSymbol = (String) iter.next(); 
      int iCount = ((Integer) cache.aggregate( new EqualsFilter("getSymbol", sSymbol), new Count())).intValue(); 
            //add up all the trades in the market
      iTotal += iCount;
     }
 

Implementing a Simple CacheLoader

This code snopped shows how to implement a simple CacheStore

 /** * Construct the dummy store. */
 public DummyTradingCacheStore() { 
                                m_mapStorage = new SafeHashMap();  
/** * 
        Store the key value pair into the cache  
        @param oKey         
        @param oValue 
*/ 
public void store(Object oKey, Object oValue) {
     m_mapStorage.put(oKey, oValue); 
} 
/** *
 Remove a a value from the cache 
@param oKey which corresponds to the value to remove 
 public void erase(Object oKey) {
     m_mapStorage.remove(oKey);
 }
 /** 
        Loads a random trade into the cache. 
        @param oKey 
        @return a Randomized trade 
*/
 public Object load(Object oKey) { 
    Map mapStorage = m_mapStorage;
     Object oValue = mapStorage.get(oKey);
         if (oValue == null) { 
                synchronized (mapStorage) { 
                        if (mapStorage.containsKey(oKey)) { 
                            oValue = mapStorage.get(oKey); 
                        } else {
                             // create a random Trade object for this key 
                            oValue = Trade.makeRandomTrade(((Integer)oKey).intValue()); 
                            mapStorage.put(oKey, oValue); 
                        }
         }
     } return oValue; 
} 
    // ---- data members ---------------------------------------------------- 
   /** * The storage for the dummy trades. */ 
protected Map m_mapStorage; 
} ;

Using JTA Transactions

Acquiring a Reference to the Coherence JTA Cache Adapter:

/**
* Get the Coherence Cache Adapter
* @param ctx the jndi context
* @return the coherence cache adapter via J2CA
* @throws NamingException when the J2CA Adapter cannot be found
*/

private CacheAdapter getCacheAdapter(Context ctx) throws NamingException {

	/* eis/coherence-tx is the  jca adapters jndi path */
	return new CacheAdapter(ctx, "eis/coherence-tx",CacheAdapter.CONCUR_OPTIMISTIC, CacheAdapter.TRANSACTION_REPEATABLE_GET, -1);
}

 

Performing the actual transaction using the CacheAdapter:

private void performTradeTx(Trade trade,List<String> errors) {

try {
Context ctx = getContext();
CacheAdapter cacheFactory = getCacheAdapter(ctx);
UserTransaction trans = getUserTransaction(ctx);
//start a jta trans trans.begin();
//grab the cache
NamedCache cache = cacheFactory.getDistributedCache("demo", Thread.currentThread().getContextClassLoader());

//now update the cache
Random random = new Random();
cache.put(random.nextInt(cache.size()-1),trade);

//commit the transaction
trans.commit();

} catch (Exception e) {
e.printStackTrace(System.out);
errors.add(e.getMessage());
}

}

Creating Continous Queries

 

  private ContinuousQueryCache query = null;
  ...
/**
  * Find all the high end trades
* @return a collection of trades
*/
public synchronized Collection<Trade> getAllBigTrades() {
//make sure the cluster is up
CacheFactory.ensureCluster();
//grab the cache
NamedCache cache = CacheFactory.getCache("demo");
//make sure no one else did this stuff if we were blocked
if (query == null) {
//find all trades for the symbol that has more than 900 lots and
//price greater higher than $9
Filter filter = new AndFilter(new EqualsFilter("getSymbol",symbol), new AndFilter(new GreaterFilter("getLot",900), new GreaterFilter("getPrice",9.0)));
query = new ContinuousQueryCache(cache, filter, false);
}
return (Collection <Trade>)query.values();
}

Configuring the Coherence Cache

He we define the distributed cache and identify the CacheStore class implementation.

 

<cache-config>

    <caching-scheme-mapping>

    <cache-mapping>

        <cache-name>*</cache-name>

        <scheme-name>distributed</scheme-name>

    </cache-mapping>

    <cache-mapping>

        <cache-name>dist-*</cache-name>

        <scheme-name>distributed</scheme-name>

    </cache-mapping>

    <cache-mapping>

        <cache-name>history-*</cache-name>

        <scheme-name>replicated</scheme-name>

    </cache-mapping>

</caching-scheme-mapping>

 <caching-schemes>

   

    <read-write-backing-map-scheme>

    <scheme-name>trade-store</scheme-name>

        <internal-cache-scheme>

            <local-scheme>

                <scheme-ref>backing-map</scheme-ref>

            </local-scheme>

        </internal-cache-scheme>

    <cachestore-scheme>

        <class-scheme>

            <class-name>com.tangosol.examples.trading.DummyTradingCacheStore</class-name>

         </class-scheme>

    </cachestore-scheme>

     <read-only>true</read-only>

   </read-write-backing-map-scheme>

   

    <distributed-scheme>

        <scheme-name>distributed</scheme-name>

        <service-name>DistributedCache</service-name>

        <backing-map-scheme>

            <read-write-backing-map-scheme>

                <scheme-ref>trade-store</scheme-ref>

            </read-write-backing-map-scheme>

        </backing-map-scheme>

        <autostart>true</autostart>

    </distributed-scheme>

   

    <local-scheme>

        <scheme-name>backing-map</scheme-name>

        <eviction-policy>HYBRID</eviction-policy>

        <high-units>{back-size-limit 0}</high-units>     

        <expiry-delay>{back-expiry 0}</expiry-delay>

        <flush-delay>1m</flush-delay>

    </local-scheme>

   

    <replicated-scheme>

    <scheme-name>replicated</scheme-name>

    <service-name>ReplicatedCache</service-name>

    <backing-map-scheme>

        <class-scheme>

            <scheme-ref>default-backing-map</scheme-ref>

        </class-scheme>

    </backing-map-scheme>

    </replicated-scheme>

   

    <class-scheme>

        <scheme-name>default-backing-map</scheme-name>

        <class-name>com.tangosol.util.SafeHashMap</class-name>

    </class-scheme>

   

    <invocation-scheme>

        <scheme-name>example-invocation</scheme-name>

        <service-name>InvocationService</service-name>

        <autostart>true</autostart>

    </invocation-scheme>

  </caching-schemes>

</cache-config>

Prerequisites

What you need to know

In order to complete the example application, you should be familiar with the following:

Software Requirements

This demonstration requires that the following software components are installed and configured correctly:

Notations

Building the Application

The configuration files are located in the %HOWTO_HOME%/etc directory, including deployment descriptor files such as application.xml. The application ear and jar can be built and deployed by issuing the following command from the console

>ant deploy

Running the Application

To run the sample application on a standalone instance of Oracle Application Server 10g 10.1.3.2 or above, follow these steps:

1. Examine the Sample File Directories

2. Configure the Environment

Ensure the following environment variables are defined:

3. Start a Cache Server

This example requires that you start one cache server so that the sample data can be loaded into the cache. You must have all the environment variable set in the console before issuing:


            >cd $HOWTO_HOME

            >ant run-cache-server

 

4. Load the sample data

This example requires that you load the sample data into the cache server . You must have all the environment variable set in the console before issuing:

                        >cd %HOWTO_HOME%

    >ant run-load-data

5. Generate, Compile, and Deploy the Application

Ant 1.6.2 is shipped with OC4J and you have to set your PATH environment variable to $ORACLE_HOME/ant/bin. On some operating systems, Ant does not currently support the use of environment variables. If this is the case for your operating system, please modify the ant-oracle.xml file located in the %HOWTO_HOME% directory.

Edit ant-oracle.properties (in the demo directory) and ensure the following properties are set to the correct values, as indicated below for OC4J standalone:

If you are using OracleAS managed install then you have appropriately change the following properties beside changing oc4j.admin.user and oc4j.admin.password for your managed OC4J instance in OracleAS install.

You have to uncomment appropriate deployer.uri in the ant-oracle.properties based on your environment i.e. a single instance OC4J or a clustered OC4J instance/group managed by OPMN.

 

To build the application, type the following command from the %HOWTO_HOME% directory:

>ant

You should now have the newly created coherencedemo.ear in your %HOWTO_HOME%/lib directory.

This command will attempt to deploy the application archive if the build is successful. It will first test whether OC4J is running before attempting the deployment operation.

Note that you can also deploy the application separately . Ensure the %ORACLE_HOME% environment variable is defined, and from the %HOWTO_HOME% directory, type the command:

>ant deploy

5. Run the Application

Run the sample by providing invoking the following URL from your favorite browser:

http://localhost:8888/coherencequeries

On this page, you will see how trade information is aggregated using Coherence Invocable Maps and Partitioned Caches. This is the Order Booking application running with just one Coherence Cache Server: Trading screen shot with one cache server.

Here is the application running with two instances of the Coherence Cache Server. Notice how the data in the server is automatically re-distributed between the servers without interupting the operation of the application. Additionally, notice how the query time is reduced significantly due to the parallelization of the queries happening across the cache: Trading screen Shot with two cache servers

Execute a big trade (over 1000 shares and a price over 9 dollars) for any symbol and the web page will be updated with the notification via Continious Query capability.

Trading screen executing a trade.

Summary

In this document, you should have learned how to:

Troubleshooting