Date: 5/01/07
Author: Adam Leftik
This example application demonstrates using Coherence to scale your application. This sample uses Coherence to demonstrate the ability to create unlimited data grids, massive parallel processing, and fault tolerence . The Trade data is loaded via a CacheStore and indexed. The Web application is used to view the aggregated trade information that is calculated in parallel using multiple cache servers. As multiple cache servers are started, note that query times will drop in proportion to the additional hardware, further, loss of a cache server does not result in user down time. The sample application performs three calculations: finds all the distinct stock symbols that have been traded, counts the number of shares traded for each symbol, and finally aggregates all the trades in the market.
Coherence provides a transaction cache store capability which brings atomicity to the data grid infrastructure. A sample servlet is included that demonstrates Java Transaction API (JTA) integration through the Java Connection Architecture (JCA) adapter provided with Coherence. Finally, Continous Queries support Event Driven Architectures by providing up-to-date query results without the overhead and semantics of reissuing queries.
The code snippet shows the aggregation of a cached data using invocable maps.
NamedCache cache = CacheFactory.getCache("demo");
//find all the distinct stock symbols that have been traded...this will be done in parallel
Set setDistinct = (Set) cache.aggregate( (Filter) null, new DistinctValues("getSymbol"));
int iTotal = 0;
//aggregate the amount traded for each symbol...this will be done in parallel
for (Iterator iter = setDistinct.iterator(); iter.hasNext();) {
String sSymbol = (String) iter.next();
int iCount = ((Integer) cache.aggregate( new EqualsFilter("getSymbol", sSymbol), new Count())).intValue();
//add up all the trades in the market
iTotal += iCount;
}
This code snopped shows how to implement a simple CacheStore
/** * Construct the dummy store. */
public DummyTradingCacheStore() {
m_mapStorage = new SafeHashMap();
/** *
Store the key value pair into the cache
@param oKey
@param oValue
*/
public void store(Object oKey, Object oValue) {
m_mapStorage.put(oKey, oValue);
}
/** *
Remove a a value from the cache
@param oKey which corresponds to the value to remove
public void erase(Object oKey) {
m_mapStorage.remove(oKey);
}
/**
Loads a random trade into the cache.
@param oKey
@return a Randomized trade
*/
public Object load(Object oKey) {
Map mapStorage = m_mapStorage;
Object oValue = mapStorage.get(oKey);
if (oValue == null) {
synchronized (mapStorage) {
if (mapStorage.containsKey(oKey)) {
oValue = mapStorage.get(oKey);
} else {
// create a random Trade object for this key
oValue = Trade.makeRandomTrade(((Integer)oKey).intValue());
mapStorage.put(oKey, oValue);
}
}
} return oValue;
}
// ---- data members ----------------------------------------------------
/** * The storage for the dummy trades. */
protected Map m_mapStorage;
} ;
Acquiring a Reference to the Coherence JTA Cache Adapter:
/**
* Get the Coherence Cache Adapter
* @param ctx the jndi context
* @return the coherence cache adapter via J2CA
* @throws NamingException when the J2CA Adapter cannot be found
*/
private CacheAdapter getCacheAdapter(Context ctx) throws NamingException {
/* eis/coherence-tx is the jca adapters jndi path */
return new CacheAdapter(ctx, "eis/coherence-tx",CacheAdapter.CONCUR_OPTIMISTIC, CacheAdapter.TRANSACTION_REPEATABLE_GET, -1);}
Performing the actual transaction using the CacheAdapter:
private void performTradeTx(Trade trade,List<String> errors) {
try {
Context ctx = getContext();
CacheAdapter cacheFactory = getCacheAdapter(ctx);
UserTransaction trans = getUserTransaction(ctx);
//start a jta trans trans.begin();
//grab the cache
NamedCache cache = cacheFactory.getDistributedCache("demo", Thread.currentThread().getContextClassLoader());
//now update the cache
Random random = new Random();
cache.put(random.nextInt(cache.size()-1),trade);
//commit the transaction
trans.commit();
} catch (Exception e) {
e.printStackTrace(System.out);
errors.add(e.getMessage());
}
}
private ContinuousQueryCache query = null; ... /** * Find all the high end trades
* @return a collection of trades
*/
public synchronized Collection<Trade> getAllBigTrades() {
//make sure the cluster is up
CacheFactory.ensureCluster();
//grab the cache
NamedCache cache = CacheFactory.getCache("demo");
//make sure no one else did this stuff if we were blocked
if (query == null) {
//find all trades for the symbol that has more than 900 lots and
//price greater higher than $9
Filter filter = new AndFilter(new EqualsFilter("getSymbol",symbol), new AndFilter(new GreaterFilter("getLot",900), new GreaterFilter("getPrice",9.0)));
query = new ContinuousQueryCache(cache, filter, false);
}
return (Collection <Trade>)query.values();
}
He we define the distributed cache and identify the CacheStore class implementation.
<cache-config>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>*</cache-name>
<scheme-name>distributed</scheme-name>
</cache-mapping>
<cache-mapping>
<cache-name>dist-*</cache-name>
<scheme-name>distributed</scheme-name>
</cache-mapping>
<cache-mapping>
<cache-name>history-*</cache-name>
<scheme-name>replicated</scheme-name>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<read-write-backing-map-scheme>
<scheme-name>trade-store</scheme-name>
<internal-cache-scheme>
<local-scheme>
<scheme-ref>backing-map</scheme-ref>
</local-scheme>
</internal-cache-scheme>
<cachestore-scheme>
<class-scheme>
<class-name>com.tangosol.examples.trading.DummyTradingCacheStore</class-name>
</class-scheme>
</cachestore-scheme>
<read-only>true</read-only>
</read-write-backing-map-scheme>
<distributed-scheme>
<scheme-name>distributed</scheme-name>
<service-name>DistributedCache</service-name>
<backing-map-scheme>
<read-write-backing-map-scheme>
<scheme-ref>trade-store</scheme-ref>
</read-write-backing-map-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<local-scheme>
<scheme-name>backing-map</scheme-name>
<eviction-policy>HYBRID</eviction-policy>
<high-units>{back-size-limit 0}</high-units>
<expiry-delay>{back-expiry 0}</expiry-delay>
<flush-delay>1m</flush-delay>
</local-scheme>
<replicated-scheme>
<scheme-name>replicated</scheme-name>
<service-name>ReplicatedCache</service-name>
<backing-map-scheme>
<class-scheme>
<scheme-ref>default-backing-map</scheme-ref>
</class-scheme>
</backing-map-scheme>
</replicated-scheme>
<class-scheme>
<scheme-name>default-backing-map</scheme-name>
<class-name>com.tangosol.util.SafeHashMap</class-name>
</class-scheme>
<invocation-scheme>
<scheme-name>example-invocation</scheme-name>
<service-name>InvocationService</service-name>
<autostart>true</autostart>
</invocation-scheme>
</caching-schemes>
</cache-config>
This demonstration requires that the following software components are installed and configured correctly:
The configuration files are
located in the %HOWTO_HOME%/etc directory, including deployment descriptor files such as application.xml. The application ear and jar can be built and deployed by issuing the following command from the console
>ant deploy
To run the sample application on a standalone instance of Oracle Application Server 10g 10.1.3.2 or above, follow these steps:
Ensure the following environment variables are defined:
This example requires that you start one cache server so that the sample data can be loaded into the cache. You must have all the environment variable set in the console before issuing:
>cd $HOWTO_HOME
>ant run-cache-server
This example requires that you load the sample data into the cache server . You must have all the environment variable set in the console before issuing:
>cd %HOWTO_HOME%
>ant run-load-data
Ant 1.6.2 is shipped with OC4J and you have to set your PATH environment variable to $ORACLE_HOME/ant/bin. On some operating systems, Ant does not currently support the use of environment variables. If this is the case for your operating system, please modify the ant-oracle.xml file located in the %HOWTO_HOME% directory.
Edit ant-oracle.properties (in the demo directory) and ensure the following properties are set to the correct values, as indicated below for OC4J standalone:
If you are using OracleAS managed install then you have appropriately change the following properties beside changing oc4j.admin.user and oc4j.admin.password for your managed OC4J instance in OracleAS install.
You have to uncomment appropriate deployer.uri in the ant-oracle.properties based on your environment i.e. a single instance OC4J or a clustered OC4J instance/group managed by OPMN.
To build the application, type the following command from the %HOWTO_HOME% directory:
>ant
You should now have the newly created coherencedemo.ear in your %HOWTO_HOME%/lib directory.
This command will attempt to deploy the application archive if the build is successful. It will first test whether OC4J is running before attempting the deployment operation.
Note that you can also deploy the application separately . Ensure the %ORACLE_HOME% environment variable is defined, and from the %HOWTO_HOME% directory, type the command:
>ant deploy
Run the sample by providing invoking the following URL from your favorite browser:
http://localhost:8888/coherencequeries
On this page, you will see how trade information is aggregated
using Coherence Invocable Maps and Partitioned Caches. This is the
Order Booking application running with just one Coherence Cache
Server:
Here is the application running with two instances of the
Coherence Cache Server. Notice how the data in the server is
automatically re-distributed between the servers without
interupting the operation of the application. Additionally, notice
how the query time is reduced significantly due to the
parallelization of the queries happening across the cache:
Execute a big trade (over 1000 shares and a price over 9 dollars) for any symbol and the web page will be updated with the notification via Continious Query capability.
In this document, you should have learned how to:
- You can use the following link http://localhost:8888/env-check.jsp to validate that oc4j environment is configured correctly.