Skip Headers
Oracle® Fusion Middleware Programming Advanced Features of JAX-WS Web Services for Oracle WebLogic Server
11g Release 1 (10.3.4)

Part Number E13734-03
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
View PDF

B Example Client Wrapper Class for Batching Reliable Messages

The following code provides an example client wrapper class that can be used for batching reliable messaging. For more information about batching reliable messages, see Grouping Messages into Business Units of Work (Batching).

Note:

This client wrapper class is example code only; it is not an officially supported production class.

Example B-1 Example Client Wrapper Class for Batching Reliable Messages

import java.io.*;
import java.lang.*;
import java.util.*;
import javax.xml.*;
 
import weblogic.wsee.jaxws.JAXWSProperties;
import weblogic.wsee.jaxws.spi.ClientInstance;
import weblogic.wsee.reliability.MessageRange;
import weblogic.wsee.reliability2.api.WsrmClient;
import weblogic.wsee.reliability2.api.WsrmClientFactory;
import weblogic.wsee.reliability2.property.WsrmInvocationPropertyBag;
import weblogic.wsee.reliability2.tube.WsrmClientImpl;
 
/**
 * Example wrapper class to batch reliable requests into fixed size 'batches'
 * that can be sent using a single RM sequence. This class allows a client to
 * send requests that have no natural common grouping or
 * 'business unit of work' and not pay the costs associated with creating and
 * terminating an RM sequence for every message.
 * NOTE: This class is an *example* of how batching might be performed. To do
 *       batching correctly, you should consider error recovery and how to
 *       report sequence errors (reported via ReliabilityErrorListener) back
 *       to the clients that made the original requests.
 * <p>
 * If your Web service client code knows of some natural business-oriented
 * grouping of requests (called a 'business unit of work'), it should make the
 * RM subsystem aware of this unit of work by using the
 * WsrmClient.setFinalMessage() method to demarcate the end of a unit (just
 * before sending the actual final request via an invocation on
 * the client instance). In some cases, notably when the client code represents
 * an intermediary in the processing of messages, the client code may not be
 * aware of any natural unit of work. In the past, if no business unit of work
 * could be determined, clients often just created the client instance, sent the
 * single current message they had, and then allowed the sequence to terminate.
 * This is functionally workable, but very inefficient. These clients pay the
 * cost of an RM sequence handshake and termination for every message they send.
 * The BatchingRmClientWrapper class can be used to introduce an artificial
 * unit of work (a batch) when no natural business unit of work is available.
 * <p>
 * Each instance of BatchingRmClientWrapper is a wrapper instance around a
 * client instance (port or Dispatch instance). This wrapper can be used to
 * obtain a Proxy instance that can be used in place of the original client
 * instance. This allows this class to perform batching operations completely
 * invisibly from the perspective of the client code.
 * <p>
 * This class is used for batching reliable requests into
 * batches of a given max size that will survive for a given maximum
 * duration. If a batch fills up or times out, it is ended, causing the
 * RM sequence it represents to be ended/terminated. The timeout ensures that
 * if the flow of incoming requests stops the batch/sequence will still
 * end in a timely manner.
 */
public class BatchingRmClientWrapper<T>
  implements InvocationHandler {
 
  private Class<T> _clazz;
  private int _batchSize;
  private long _maxBatchLifetimeMillis;
  private T _clientInstance;
  private PrintWriter _out;
  private WsrmClient _rmClient;
  private int _numInCurrentBatch;
  private int _batchNum;
  private Timer _timer;
  private boolean _closed;
  private boolean _proxyCreated;
 
  /**
   * Create a wrapper instance for batching reliable requests into
   * batches of the given max size that will survive for the given maximum
   * duration. If a batch fills up or times out, it is ended, causing the
   * RM sequence it represents to be ended/terminated.
   * @param clientInstance The client instance that acts as the source object
   *        for the batching proxy created by the createProxy() method. This
   *        is the port/Dispatch instance returned from the call to
   *        getPort/createDispatch. The BatchingRmClientWrapper will take over
   *        responsibility for managing the interaction with and cleanup of
   *        the client instance via the proxy created from createProxy.
   * @param clazz of the proxy instance created from createProxy.
   *        This should be the class of the port/Dispatch instance you would
   *        use to invoke operations on the service. BatchingRmClientWrapper will
   *        create (via createProxy) a proxy of the given type that can be
   *        used in place of the original client instance.
   * @param batchSize Max number of requests to put into a batch. If the
   *        max number of requests are sent for a given batch, that batch is
   *        ended (ending/terminating the sequence it represents) and a new
   *        batch is started.
   * @param maxBatchLifetime A duration value (in the lexical form supported
   *        by java.util.Duration, e.g. PT30S for 30 seconds) representing
   *        the maximum time a batch should exist. If the batch exists longer
   *        than this time, it is ended and a new batch is begun.
   * @param out A print stream that can be used to print diagnostic and
   *        status messages.
   */
  public BatchingRmClientWrapper(T clientInstance, Class<T> clazz,
                                 int batchSize, String maxBatchLifetime,
                                 PrintStream out) {
    _clazz = clazz;
    _batchSize = batchSize;
    try {
      if (maxBatchLifetime == null) {
        maxBatchLifetime = "PT5M";
      }
      Duration duration =
        DatatypeFactory.newInstance().newDuration(maxBatchLifetime);
      _maxBatchLifetimeMillis = duration.getTimeInMillis(new Date());
    } catch (Exception e) {
      throw new RuntimeException(e.toString(), e);
    }
    _clientInstance = clientInstance;
    _out = new PrintWriter(out, true);
    _rmClient = WsrmClientFactory.getWsrmClientFromPort(_clientInstance);
    _closed = false;
    _proxyCreated = false;
    _timer = new Timer(true);
    _timer.schedule(new TimerTask() {
      @Override
      public void run() {
        terminateOrEndBatch();
      }
    }, _maxBatchLifetimeMillis);
  }
 
  /**
   * Creates the dynamic proxy that should be used in place of the client
   * instance used to create this BatchingRmClientWrapper instance. This method
   * should be called only once per BatchingRmClientWrapper.
   */
  public T createProxy() {
    if (_proxyCreated) {
      throw new IllegalStateException("Already created the proxy for this BatchingRmClientWrapper instance which wraps the client instance: " + _clientInstance);
    }
    _proxyCreated = true;
    return (T) Proxy.newProxyInstance(getClass().getClassLoader(),
                                     new Class[] {
                                       _clazz,
                                       BindingProvider.class,
                                       java.io.Closeable.class
                                     }, this);
  }
 
  private void terminateOrEndBatch() {
    synchronized(_clientInstance) {
      if (_rmClient.getSequenceId() != null) {
        if (terminateBatchAllowed()) {
          _out.println("Terminating batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance);
          try {
            _rmClient.terminateSequence();
          } catch (Exception e) {
            e.printStackTrace(_out);
          }
        } else {
          _out.println("Batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + " timed out but has outstanding requests to send and cannot be terminated now");
        }
      }
      endBatch();
    }
  }
 
  /**
   * Check to see if there are acknowledgements for all requests sent. If so,
   * terminate.
   */
  private boolean terminateBatchAllowed() {
    try {
      synchronized(_clientInstance) {
        if (_rmClient.getSequenceId() != null) {
 
          // TODO: Remove this workaround when getMostRecentMessageNumber is
          //       fixed.
          //       The following BUG is targeted for the next release, at which time
          //       the workaround can be removed.
          //       Bug 10382605 - WSRMCLIENT.GETMOSTRECENTMESSAGENUMBER() ALWAYS RETURNS 0
          //long maxMsgNum = _rmClient.getMostRecentMessageNumber();
 
          // --------------------------------------------------------
          // Workaround: Start
          //   *** Do *not* use this API in your own code. It is not
          //       supported for customer use ***
          WsrmInvocationPropertyBag rmProps =
            (WsrmInvocationPropertyBag)((BindingProvider)_clientInstance).
              getRequestContext().get(WsrmInvocationPropertyBag.key);
          long maxMsgNum = rmProps != null ? rmProps.getMostRecentMsgNum() : 0;
          // Workaround: End
          // --------------------------------------------------------
 
          if (maxMsgNum < 1) {
            // No messages sent, go ahead and terminate.
            return true;
          }
          SortedSet<MessageRange> ranges = _rmClient.getAckRanges();
          long maxAck = -1;
          boolean hasGaps = false;
          long lastRangeUpper = -1;
          for (MessageRange range: ranges) {
            if (lastRangeUpper > 0) {
              if (range.lowerBounds != lastRangeUpper + 1) {
                hasGaps = true;
              }
            } else {
              lastRangeUpper = range.upperBounds;
            }
            maxAck = range.upperBounds;
          }
          return !(hasGaps || maxAck < maxMsgNum);
        }
      }
    } catch (Exception e) {
      e.printStackTrace(_out);
    }
    return true;
  }
 
  private void endBatch() {
    synchronized(_clientInstance) {
      if (_numInCurrentBatch > 0) {
        _out.println("Ending batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") for " + _clientInstance + "...");
      }
      resetWsrmClient(_rmClient, _clientInstance);
      _numInCurrentBatch = 0;
      if (!_closed) {
        _timer.schedule(new TimerTask() {
          @Override
          public void run() {
            terminateOrEndBatch();
          }
        }, _maxBatchLifetimeMillis);
      }
    }
  }
 
  /**
   * Resets a WsrmClient instance (and the client instance it represents)
   * so it can track a new WS-RM sequence for the next invoke on the client
   * instance. This method effectively *disconnects* the RM sequence from the
   * client instance and lets them continue/complete separately.
   * NOTE: You should use this method instead of WsrmClient.reset() alone due
   *       to a bug in WsrmClient.reset that does not completely reset all state
   *       stored on the client instance from the old sequence.
   */
  public static void resetWsrmClient(WsrmClient rmClient, Object clientInstance) {
    rmClient.reset();
    // TODO: Shouldn't have to do this, as _rmClient.reset should do it for
    //       The following BUG is targeted for the next release, at which time
    //       the workaround can be removed.
    //       Bug 10382543 - WSRMCLIENT.RESET() FAILS TO RESET ALL NECESSARY STATE
    // --------------------------------------------------------
    // Workaround: Start
    //   *** Do *not* use this API in your own code. It is not
    //       supported for customer use ***
    WeakReference<ClientInstance> clientInstanceRef =
      (WeakReference<ClientInstance>)
        ((BindingProvider)clientInstance).
          getRequestContext().get(JAXWSProperties.CLIENT_INSTANCE_WEAK_REF);
    ClientInstance instance = clientInstanceRef != null ?
      clientInstanceRef.get() : null;
    if (instance != null) {
      instance.getProps().
        remove(WsrmClientImpl.CLIENT_CURRENT_SEQUENCE_ID_PROP_NAME);
    }
    // Workaround: End
    // --------------------------------------------------------
  }
 
  public Object invoke(Object proxy, Method method, Object[] args)
    throws Throwable {
    boolean operationInvoke = method.getDeclaringClass() == _clazz;
    boolean closeableInvoke = method.getDeclaringClass() ==
                              java.io.Closeable.class;
    boolean endOfBatch = false;
    if (operationInvoke) {
      synchronized(_clientInstance) {
        // Check our batch size
        if (_numInCurrentBatch == 0) {
          _batchNum++;
        }
        endOfBatch = _numInCurrentBatch >= _batchSize - 1;
        if (endOfBatch) {
          _rmClient.setFinalMessage();
        }
        _out.println("Making " + (endOfBatch ? "final " : "") + "invoke " + (_numInCurrentBatch+1) + " of batch " + _batchNum + " sequence (" + _rmClient.getSequenceId() + ") with operation: " + method.getName());
      }
    } else if (closeableInvoke && method.getName().equals("close")) {
      synchronized(_clientInstance) {
        // Make sure we don't try to schedule the timer anymore
        _closed = true;
        _timer.cancel();
      }
    }
    Object ret = method.invoke(_clientInstance, args);
    if (operationInvoke) {
      synchronized(_clientInstance) {
        _numInCurrentBatch++;
        if (endOfBatch) {
          endBatch();
        }
      }
    }
    return ret;
  }
}