| Oracle® Streams Advanced Queuing User's Guide and Reference Release 10.1 Part Number B10785-01 |
|
|
View PDF |
This chapter illustrates a messaging environment that can be constructed using Oracle Streams.
This chapter contains these topics:
Enqueue and Dequeue Events Using JMS
|
See Also: Oracle Streams Concepts and Administration for more information about messaging andSYS.AnyData queues |
This example illustrates using a single SYS.AnyData queue at a database called oedb.net to create a Oracle Streams messaging environment in which events containing message payloads of different types are stored in the same queue. Specifically, this example illustrates the following messaging features of Oracle Streams:
Enqueuing messages containing order payload and customer payload as SYS.Anydata events into the queue
Enqueuing messages containing row LCR payload as SYS.Anydata events into the queue
Creating a rule set for applying the events
Creating an evaluation context used by the rule set
Creating a Oracle Streams apply process to dequeue and process the events based on rules
Creating a message handler and associating it with the apply process
Explicitly dequeuing and processing events based on rules without using the apply process
Figure 24-1 provides an overview of this environment.
Figure 24-1 Example Oracle Streams Messaging Environment

The following tasks must be completed before you begin the example in this section.
Set initialization parameter COMPATIBLE to 9.2.0 or higher.
Configure your network and Oracle Net so that you can access the oedb.net database from the client where you run these scripts.
This example creates a new user to function as the Oracle Streams administrator (strmadmin) and prompts you for the tablespace you want to use for this user's data. Before you start this example, either create a new tablespace or identify an existing tablespace for the Oracle Streams administrator to use. The Oracle Streams administrator should not use the SYSTEM tablespace.
Complete the following steps to set up users and create a SYS.AnyData queue for a Oracle Streams messaging environment.
|
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_setup_message.out /*
Step 2 Set Up Users
Connect to oedb.net as SYS user.
*/ CONNECT SYS/CHANGE_ON_INSTALL@oedb.net AS SYSDBA /*
This example uses the oe sample schema. For this example to work properly, the oe user must have privileges to run the subprograms in the DBMS_AQ package. The oe user is specified as the queue user when the SYS.AnyData queue is created in Step 3. The SET_UP_QUEUE procedure grants the oe user enqueue and dequeue privileges on the queue, but the oe user also needs EXECUTE privilege on the DBMS_AQ package to enqueue events into and dequeue events from the queue.
Also, most of the configuration and administration actions illustrated in this example are performed by the Oracle Streams administrator. In this step, create the Oracle Streams administrator named strmadmin and grant this user the necessary privileges. These privileges enable the user to run subprograms in packages related to Oracle Streams, create rule sets, create rules, and monitor the Oracle Streams environment by querying data dictionary views. You can choose a different name for this user.
|
Note:
|
*/
GRANT EXECUTE ON DBMS_AQ TO oe;
GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE
TO strmadmin IDENTIFIED BY strmadminpw;
ACCEPT streams_tbs PROMPT 'Enter the tablespace for the Oracle Streams administrator: '
ALTER USER strmadmin DEFAULT TABLESPACE &streams_tbs
QUOTA UNLIMITED ON &streams_tbs;
GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin;
GRANT EXECUTE ON DBMS_AQ TO strmadmin;
GRANT EXECUTE ON DBMS_AQADM TO strmadmin;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin;
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'strmadmin',
grant_option => FALSE);
END;
/
/*
Step 3 Create the SYS.AnyData Queue
Connect as the Oracle Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Run the SET_UP_QUEUE procedure to create a queue named oe_queue at oedb.net. This queue functions as the SYS.AnyData queue by holding events used in the messaging environment.
Running the SET_UP_QUEUE procedure performs the following actions:
Creates a queue table named oe_queue_table. This queue table is owned by the Oracle Streams administrator (strmadmin) and uses the default storage of this user.
Creates a queue named oe_queue owned by the Oracle Streams administrator (strmadmin)
Starts the queue
*/
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE(
queue_table => 'oe_queue_table',
queue_name => 'oe_queue');
END;
/
/*
Step 4 Grant the oe User Privileges on the Queue
*/
BEGIN
SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE(
privilege => 'ALL',
queue_name => 'strmadmin.oe_queue',
grantee => 'oe');
END;
/
/*
Step 5 Create an Agent for Explicit Enqueue
Create an agent that will be used to perform explicit enqueue operations on the oe_queue queue.
*/
BEGIN
SYS.DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'explicit_enq');
END;
/
/*
Step 6 Associate the oe User with the explicit_enq Agent
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue queue is a secure queue because it was created using SET_UP_QUEUE. This step enables the oe user to perform enqueue operations on this queue.
*/
BEGIN
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'explicit_enq',
db_username => 'oe');
END;
/
/*
Step 7 Check the Spool Results
Check the streams_setup_message.out spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to create one PL/SQL procedure that enqueues non-LCR events into the SYS.AnyData queue and one PL/SQL procedure that enqueues row LCR events into the SYS.AnyData queue.
|
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_enqprocs_message.out /*
Step 2 Create a Type to Represent Orders
Connect as oe.
*/ CONNECT oe/oe@oedb.net /*
Create a type to represent orders based on the columns in the oe.orders table. The type attributes include the columns in the oe.orders table, along with one extra attribute named action. The value of the action attribute for instances of this type is used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type is used for events that are enqueued into the SYS.AnyData queue.
*/ CREATE OR REPLACE TYPE order_event_typ AS OBJECT ( order_id NUMBER(12), order_date TIMESTAMP(6) WITH LOCAL TIME ZONE, order_mode VARCHAR2(8), customer_id NUMBER(6), order_status NUMBER(2), order_total NUMBER(8,2), sales_rep_id NUMBER(6), promotion_id NUMBER(6), action VARCHAR(7)); / /*
Step 3 Create a Type to Represent Customers
Create a type to represent customers based on the columns in the oe.customers table. The type attributes include the columns in the oe.customers table, along with one extra attribute named action. The value of the action attribute for instances of this type is used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type is used for events that are enqueued into the SYS.AnyData queue.
*/ CREATE OR REPLACE TYPE customer_event_typ AS OBJECT ( customer_id NUMBER(6), cust_first_name VARCHAR2(20), cust_last_name VARCHAR2(20), cust_address CUST_ADDRESS_TYP, phone_numbers PHONE_LIST_TYP, nls_language VARCHAR2(3), nls_territory VARCHAR2(30), credit_limit NUMBER(9,2), cust_email VARCHAR2(30), account_mgr_id NUMBER(6), cust_geo_location MDSYS.SDO_GEOMETRY, action VARCHAR(7)); / /*
Step 4 Create the Procedure to Enqueue Non-LCR Events
Create a PL/SQL procedure called enq_proc to enqueue events into the SYS.AnyData queue.
|
Note: A single enqueued message can be dequeued by an apply process and by an explicit dequeue, but this example does not illustrate this capability. |
*/
CREATE OR REPLACE PROCEDURE oe.enq_proc (event IN SYS.Anydata) IS
enqopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_eventid RAW(16);
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL);
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue',
enqueue_options => enqopt,
message_properties => mprop,
payload => event,
msgid => enq_eventid);
END;
/
/*
Step 5 Create a Procedure to Construct and Enqueue Row LCR Events
Create a procedure called enq_row_lcr that constructs a row LCR and then enqueues the row LCR into the queue.
*/
CREATE OR REPLACE PROCEDURE oe.enq_row_lcr(
source_dbname VARCHAR2,
cmd_type VARCHAR2,
obj_owner VARCHAR2,
obj_name VARCHAR2,
old_vals SYS.LCR$_ROW_LIST,
new_vals SYS.LCR$_ROW_LIST) AS
eopt DBMS_AQ.ENQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
enq_msgid RAW(16);
row_lcr SYS.LCR$_ROW_RECORD;
BEGIN
mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL);
-- Construct the LCR based on information passed to procedure
row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT(
source_database_name => source_dbname,
command_type => cmd_type,
object_owner => obj_owner,
object_name => obj_name,
old_values => old_vals,
new_values => new_vals);
-- Enqueue the created row LCR
DBMS_AQ.ENQUEUE(
queue_name => 'strmadmin.oe_queue',
enqueue_options => eopt,
message_properties => mprop,
payload => SYS.AnyData.ConvertObject(row_lcr),
msgid => enq_msgid);
END enq_row_lcr;
/
/*
Step 6 Check the Spool Results
Check the streams_enqprocs_message.out spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to configure an apply process to apply the user-enqueued events in the SYS.AnyData queue.
Create a Function to Determine the Value of the action Attribute
Create a Rule that Evaluates to TRUE if the Event Action Is apply
|
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_apply_message.out /*
Step 2 Create a Function to Determine the Value of the action Attribute
Connect as oe.
*/ CONNECT oe/oe@oedb.net /*
Create a function called get_oe_action to determine the value of the action attribute in the events in the queue. This function is used in rules later in this example to determine the value of the action attribute for an event. Then, the clients of the rules engine perform the appropriate action for the event (either dequeue by apply process or explicit dequeue). In this example, the clients of the rules engine are the apply process and the oe.explicit_dq PL/SQL procedure.
*/
CREATE OR REPLACE FUNCTION oe.get_oe_action (event IN SYS.Anydata)
RETURN VARCHAR2
IS
ord oe.order_event_typ;
cust oe.customer_event_typ;
num NUMBER;
type_name VARCHAR2(61);
BEGIN
type_name := event.GETTYPENAME;
IF type_name = 'OE.ORDER_EVENT_TYP' THEN
num := event.GETOBJECT(ord);
RETURN ord.action;
ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
num := event.GETOBJECT(cust);
RETURN cust.action;
ELSE
RETURN NULL;
END IF;
END;
/
/*
Step 3 Create a Message Handler
Create a message handler called mes_handler that will be used as a message handler by the apply process. This procedure takes the payload in a user-enqueued event of type oe.order_event_typ or oe.customer_event_typ and inserts it as a row in the oe.orders table and oe.customers table, respectively.
*/
CREATE OR REPLACE PROCEDURE oe.mes_handler (event SYS.AnyData)
IS
ord oe.order_event_typ;
cust oe.customer_event_typ;
num NUMBER;
type_name VARCHAR2(61);
BEGIN
type_name := event.GETTYPENAME;
IF type_name = 'OE.ORDER_EVENT_TYP' THEN
num := event.GETOBJECT(ord);
INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date,
ord.order_mode, ord.customer_id, ord.order_status, ord.order_total,
ord.sales_rep_id, ord.promotion_id);
ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN
num := event.GETOBJECT(cust);
INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name,
cust.cust_last_name, cust.cust_address, cust.phone_numbers,
cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email,
cust.account_mgr_id, cust.cust_geo_location);
END IF;
END;
/
/*
Step 4 Grant strmadmin EXECUTE Privilege on the Procedures
*/
GRANT EXECUTE ON get_oe_action TO strmadmin; GRANT EXECUTE ON mes_handler TO strmadmin; /*
Step 5 Create the Evaluation Context for the Rule Set
Connect as the Oracle Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Create the evaluation context for the rule set. The table alias is tab in this example, but you can use a different table alias name if you wish.
*/
DECLARE
table_alias SYS.RE$TABLE_ALIAS_LIST;
BEGIN
table_alias := SYS.RE$TABLE_ALIAS_LIST(SYS.RE$TABLE_ALIAS(
'tab',
'strmadmin.oe_queue_table'));
DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT(
evaluation_context_name => 'oe_eval_context',
table_aliases => table_alias);
END;
/
/*
Step 6 Create a Rule Set for the Apply Process
Create the rule set for the apply process.
*/
BEGIN
DBMS_RULE_ADM.CREATE_RULE_SET(
rule_set_name => 'apply_oe_rs',
evaluation_context => 'strmadmin.oe_eval_context');
END;
/
/*
Step 7 Create a Rule that Evaluates to TRUE if the Event Action Is apply
Create a rule that evaluates to TRUE if the action value of an event is apply. Notice that tab.user_data is passed to the oe.get_oe_action function. The tab.user_data column holds the event payload in a queue table. The table alias for the queue table was specified as tab in Step 5.
*/
BEGIN
DBMS_RULE_ADM.CREATE_RULE(
rule_name => 'strmadmin.apply_action',
condition => ' oe.get_oe_action(tab.user_data) = ''APPLY'' ');
END;
/
/*
Step 8 Create a Rule that Evaluates to TRUE for the Row LCR Events
Create a rule that evaluates to TRUE if the event in the queue is a row LCR that changes either the oe.orders table or the oe.customers table. This rule enables the apply process to apply user-enqueued changes to the tables directly. For convenience, this rule uses the Oracle-supplied evaluation context SYS.STREAMS$_EVALUATION_CONTEXT because the rule is used to evaluate LCRs. When this rule is added to the rule set, this evaluation context is used for the rule during evaluation instead of the rule set's evaluation context.
*/
BEGIN
DBMS_RULE_ADM.CREATE_RULE(
rule_name => 'apply_lcrs',
condition => ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' ||
' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' ||
':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ',
evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT');
END;
/
/*
Step 9 Add the Rules to the Rule Set
Add the rules created in Step 7 and Step 8 to the rule set created in Step 6.
*/
BEGIN
DBMS_RULE_ADM.ADD_RULE(
rule_name => 'apply_action',
rule_set_name => 'apply_oe_rs');
DBMS_RULE_ADM.ADD_RULE(
rule_name => 'apply_lcrs',
rule_set_name => 'apply_oe_rs');
END;
/
/*
Step 10 Create an Apply Process
Create an apply process that is associated with the oe_queue, that uses the apply_oe_rs rule set, and that uses the mes_handler procedure as a message handler.
*/
BEGIN
DBMS_APPLY_ADM.CREATE_APPLY(
queue_name => 'strmadmin.oe_queue',
apply_name => 'apply_oe',
rule_set_name => 'strmadmin.apply_oe_rs',
message_handler => 'oe.mes_handler',
apply_user => 'oe',
apply_captured => false);
END;
/
/*
Step 11 Grant EXECUTE Privilege on the Rule Set To oe User
Grant EXECUTE privilege on the strmadmin.apply_oe_rs rule set. Because oe was specified as the apply user when the apply process was created in Step 10, oe needs EXECUTE privilege on the rule set used by the apply process.
*/
BEGIN
DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE(
privilege => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET,
object_name => 'strmadmin.apply_oe_rs',
grantee => 'oe',
grant_option => FALSE);
END;
/
/*
Step 12 Start the Apply Process
Set the disable_on_error parameter to n so that the apply process is not disabled if it encounters an error, and start the apply process at oedb.net.
*/
BEGIN
DBMS_APPLY_ADM.SET_PARAMETER(
apply_name => 'apply_oe',
parameter => 'disable_on_error',
value => 'n');
END;
/
BEGIN
DBMS_APPLY_ADM.START_APPLY(
apply_name => 'apply_oe');
END;
/
/*
Step 13 Check the Spool Results
Check the streams_apply_message.out spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to configure explicit dequeue of messages based on message contents.
|
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_explicit_dq.out /*
Step 2 Create an Agent for Explicit Dequeue
Connect as the Oracle Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Create an agent that will be used to perform explicit dequeue operations on the oe_queue queue.
*/
BEGIN
SYS.DBMS_AQADM.CREATE_AQ_AGENT(
agent_name => 'explicit_dq');
END;
/
/*
Step 3 Associate the oe User with the explicit_dq Agent
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue queue is a secure queue because it was created using SET_UP_QUEUE. The oe user is able to perform dequeue operations on this queue when the agent is used to create a subscriber to the queue in the next step.
*/
BEGIN
DBMS_AQADM.ENABLE_DB_ACCESS(
agent_name => 'explicit_dq',
db_username => 'oe');
END;
/
/*
Step 4 Add a Subscriber to the oe_queue Queue
Add a subscriber to the oe_queue queue. This subscriber will perform explicit dequeues of events. A subscriber rule is used to dequeue any events where the action value is not apply. If the action value is apply for an event, then the event is ignored by the subscriber. Such events are dequeued and processed by the apply process.
*/
DECLARE
subscriber SYS.AQ$_AGENT;
BEGIN
subscriber := SYS.AQ$_AGENT('explicit_dq', NULL, NULL);
SYS.DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'strmadmin.oe_queue',
subscriber => subscriber,
rule => 'oe.get_oe_action(tab.user_data) != ''APPLY''');
END;
/
/*
Step 5 Create a Procedure to Dequeue Events Explicitly
Connect as oe.
*/ CONNECT oe/oe@oedb.net /*
Create a PL/SQL procedure called explicit_dq to dequeue events explicitly using the subscriber created in Step 4.
|
Note:
|
*/
CREATE OR REPLACE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS
deqopt DBMS_AQ.DEQUEUE_OPTIONS_T;
mprop DBMS_AQ.MESSAGE_PROPERTIES_T;
msgid RAW(16);
payload SYS.AnyData;
new_messages BOOLEAN := TRUE;
ord oe.order_event_typ;
cust oe.customer_event_typ;
tc pls_integer;
next_trans EXCEPTION;
no_messages EXCEPTION;
pragma exception_init (next_trans, -25235);
pragma exception_init (no_messages, -25228);
BEGIN
deqopt.consumer_name := consumer;
deqopt.wait := 1;
WHILE (new_messages) LOOP
BEGIN
DBMS_AQ.DEQUEUE(
queue_name => 'strmadmin.oe_queue',
dequeue_options => deqopt,
message_properties => mprop,
payload => payload,
msgid => msgid);
COMMIT;
deqopt.navigation := DBMS_AQ.NEXT;
DBMS_OUTPUT.PUT_LINE('Event Dequeued');
DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName);
IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN
tc := payload.GetObject(ord);
DBMS_OUTPUT.PUT_LINE('order_id - ' || ord.order_id);
DBMS_OUTPUT.PUT_LINE('order_date - ' || ord.order_date);
DBMS_OUTPUT.PUT_LINE('order_mode - ' || ord.order_mode);
DBMS_OUTPUT.PUT_LINE('customer_id - ' || ord.customer_id);
DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status);
DBMS_OUTPUT.PUT_LINE('order_total - ' || ord.order_total);
DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id);
DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id);
END IF;
IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN
tc := payload.GetObject(cust);
DBMS_OUTPUT.PUT_LINE('customer_id - ' || cust.customer_id);
DBMS_OUTPUT.PUT_LINE('cust_first_name - ' || cust.cust_first_name);
DBMS_OUTPUT.PUT_LINE('cust_last_name - ' || cust.cust_last_name);
DBMS_OUTPUT.PUT_LINE('street_address - ' ||
cust.cust_address.street_address);
DBMS_OUTPUT.PUT_LINE('postal_code - ' ||
cust.cust_address.postal_code);
DBMS_OUTPUT.PUT_LINE('city - ' || cust.cust_address.city);
DBMS_OUTPUT.PUT_LINE('state_province - ' ||
cust.cust_address.state_province);
DBMS_OUTPUT.PUT_LINE('country_id - ' ||
cust.cust_address.country_id);
DBMS_OUTPUT.PUT_LINE('phone_number1 - ' || cust.phone_numbers(1));
DBMS_OUTPUT.PUT_LINE('phone_number2 - ' || cust.phone_numbers(2));
DBMS_OUTPUT.PUT_LINE('phone_number3 - ' || cust.phone_numbers(3));
DBMS_OUTPUT.PUT_LINE('nls_language - ' || cust.nls_language);
DBMS_OUTPUT.PUT_LINE('nls_territory - ' || cust.nls_territory);
DBMS_OUTPUT.PUT_LINE('credit_limit - ' || cust.credit_limit);
DBMS_OUTPUT.PUT_LINE('cust_email - ' || cust.cust_email);
DBMS_OUTPUT.PUT_LINE('account_mgr_id - ' || cust.account_mgr_id);
END IF;
EXCEPTION
WHEN next_trans THEN
deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
WHEN no_messages THEN
new_messages := FALSE;
DBMS_OUTPUT.PUT_LINE('No more events');
END;
END LOOP;
END;
/
/*
Step 6 Check Spool Results
Check the streams_explicit_dq.out spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to enqueue non-LCR events and row LCR events into the queue.
|
Note:
|
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET ECHO ON and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_enq_deq.out /*
Step 2 Enqueue Non-LCR Events to be Dequeued by the Apply Process
Connect as oe.
*/ CONNECT oe/oe@oedb.net /*
Enqueue events with apply for the action value. Based on the apply process rules, the apply process dequeues and processes these events with the oe.mes_handler message handler procedure created in "Create a Message Handler". The COMMIT after the enqueues makes these two enqueues part of the same transaction. An enqueued message is not visible until the session that enqueued it commits the enqueue.
*/
BEGIN
oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ(
2500,'05-MAY-01','online',117,3,44699,161,NULL,'APPLY')));
END;
/
BEGIN
oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ(
990,'Hester','Prynne',oe.cust_address_typ('555 Beacon Street','Boston',
'MA',02109,'US'),oe.phone_list_typ('+1 617 123 4104', '+1 617 083 4381',
'+1 617 742 5813'),'i','AMERICA',5000,'a@scarlet_letter.com',145,
NULL,'APPLY')));
END;
/
COMMIT;
/*
Step 3 Enqueue Non-LCR Events to be Dequeued Explicitly
Enqueue events with dequeue for the action value. The oe.explicit_dq procedure created in "Create a Procedure to Dequeue Events Explicitly" dequeues these events because the action is not apply. Based on the apply process rules, the apply process ignores these events. The COMMIT after the enqueues makes these two enqueues part of the same transaction.
*/
BEGIN
oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ(
2501,'22-JAN-00','direct',117,3,22788,161,NULL,'DEQUEUE')));
END;
/
BEGIN
oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ(
991,'Nick','Carraway',oe.cust_address_typ('10th Street',
11101,'Long Island','NY','US'),oe.phone_list_typ('+1 718 786 2287',
'+1 718 511 9114', '+1 718 888 4832'),'i','AMERICA',3000,
'nick@great_gatsby.com',149,NULL,'DEQUEUE')));
END;
/
COMMIT;
/*
Step 4 Enqueue Row LCR Events to be Dequeued by the Apply Process
Enqueue row LCR events. The apply process applies these events directly. Enqueued LCRs should commit at transaction boundaries. In this step, a COMMIT statement is run after each enqueue, making each enqueue a separate transaction. However, you can perform multiple LCR enqueues before a commit if there is more than one LCR in a transaction.
Create a row LCR that inserts a row into the oe.orders table.
*/
DECLARE
newunit1 SYS.LCR$_ROW_UNIT;
newunit2 SYS.LCR$_ROW_UNIT;
newunit3 SYS.LCR$_ROW_UNIT;
newunit4 SYS.LCR$_ROW_UNIT;
newunit5 SYS.LCR$_ROW_UNIT;
newunit6 SYS.LCR$_ROW_UNIT;
newunit7 SYS.LCR$_ROW_UNIT;
newunit8 SYS.LCR$_ROW_UNIT;
newvals SYS.LCR$_ROW_LIST;
BEGIN
newunit1 := SYS.LCR$_ROW_UNIT(
'ORDER_ID',
SYS.AnyData.ConvertNumber(2502),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit2 := SYS.LCR$_ROW_UNIT(
'ORDER_DATE',
SYS.AnyData.ConvertTimestampLTZ('04-NOV-00'),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit3 := SYS.LCR$_ROW_UNIT(
'ORDER_MODE',
SYS.AnyData.ConvertVarchar2('online'),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit4 := SYS.LCR$_ROW_UNIT(
'CUSTOMER_ID',
SYS.AnyData.ConvertNumber(145),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit5 := SYS.LCR$_ROW_UNIT(
'ORDER_STATUS',
SYS.AnyData.ConvertNumber(3),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit6 := SYS.LCR$_ROW_UNIT(
'ORDER_TOTAL',
SYS.AnyData.ConvertNumber(35199),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit7 := SYS.LCR$_ROW_UNIT(
'SALES_REP_ID',
SYS.AnyData.ConvertNumber(160),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newunit8 := SYS.LCR$_ROW_UNIT(
'PROMOTION_ID',
SYS.AnyData.ConvertNumber(1),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4,
newunit5,newunit6,newunit7,newunit8);
oe.enq_row_lcr(
source_dbname => 'OEDB.NET',
cmd_type => 'INSERT',
obj_owner => 'OE',
obj_name => 'ORDERS',
old_vals => NULL,
new_vals => newvals);
END;
/
COMMIT;
/*
Create a row LCR that updates the row inserted into the oe.orders table previously.
*/
DECLARE
oldunit1 SYS.LCR$_ROW_UNIT;
oldunit2 SYS.LCR$_ROW_UNIT;
oldvals SYS.LCR$_ROW_LIST;
newunit1 SYS.LCR$_ROW_UNIT;
newvals SYS.LCR$_ROW_LIST;
BEGIN
oldunit1 := SYS.LCR$_ROW_UNIT(
'ORDER_ID',
SYS.AnyData.ConvertNumber(2502),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
oldunit2 := SYS.LCR$_ROW_UNIT(
'ORDER_TOTAL',
SYS.AnyData.ConvertNumber(35199),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2);
newunit1 := SYS.LCR$_ROW_UNIT(
'ORDER_TOTAL',
SYS.AnyData.ConvertNumber(5235),
DBMS_LCR.NOT_A_LOB,
NULL,
NULL);
newvals := SYS.LCR$_ROW_LIST(newunit1);
oe.enq_row_lcr(
source_dbname => 'OEDB.NET',
cmd_type => 'UPDATE',
obj_owner => 'OE',
obj_name => 'ORDERS',
old_vals => oldvals,
new_vals => newvals);
END;
/
COMMIT;
/*
Step 5 Check Spool Results
Check the streams_enq_deq.out spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to dequeue the events explicitly and query the events that were applied by the apply process. These events were enqueued in the "Enqueue Events".
Step 1 Run the Procedure to Dequeue Events Explicitly
Run the procedure you created in "Create a Procedure to Dequeue Events Explicitly" and specify the consumer of the events you want to dequeue. In this case, the consumer is the subscriber you added in "Add a Subscriber to the oe_queue Queue". In this example, events that are not dequeued explicitly by this procedure are dequeued by the apply process.
CONNECT oe/oe@oedb.net
SET SERVEROUTPUT ON SIZE 100000
EXEC oe.explicit_dq('explicit_dq');
You should see the non-LCR events that were enqueued in "Enqueue Non-LCR Events to be Dequeued Explicitly".
Step 2 Query for Applied Events
Query the oe.orders and oe.customers table to see the rows corresponding to the events applied by the apply process:
SELECT * FROM oe.orders WHERE order_id = 2500; SELECT cust_first_name, cust_last_name, cust_email FROM oe.customers WHERE customer_id = 990; SELECT * FROM oe.orders WHERE order_id = 2502;
You should see the non-LCR event that was enqueued in "Enqueue Non-LCR Events to be Dequeued by the Apply Process" and the row LCR events that were enqueued in "Enqueue Row LCR Events to be Dequeued by the Apply Process".
This example enqueues non-LCR events and row LCR events into the queue using Java Message Service (JMS). Then, this example dequeues these events from the queue using JMS.
|
Note: Enqueue of JMS types and XML types does not work with Oracle StreamsSys.Anydata queues unless you call DBMS_AQADM.ENABLE_JMS_TYPES(queue_table_name) after DBMS_STREAMS_ADM.SET_UP_QUEUE(). Enabling an Oracle Streams queue for these types may affect import/export of the queue table. |
Complete the following steps:
Step 1 Run the catxlcr.sql Script
For this example to complete successfully, the LCR schema must be loaded into the SYS schema using the catxlcr.sql script in Oracle home in the rdbms/admin/ directory. Run this script now if it has not been run already.
|
Note: To run catxlcr.sql, you must either have created the database using Database Configuration Assistant or separately installed Java Virtual Machine, XDB, and XML Schema. |
For example, if your Oracle home directory is /usr/oracle, then enter the following to run the script:
CONNECT SYS/CHANGE_ON_INSTALL AS SYSDBA @/usr/oracle/rdbms/admin/catxlcr.sql
Step 2 Create the Types for User Events
CONNECT oe/oe
CREATE TYPE address AS OBJECT (street VARCHAR (30), num NUMBER) / CREATE TYPE person AS OBJECT (name VARCHAR (30), home ADDRESS) /
Step 3 Set the CLASSPATH
The following jar and zip files should be in the CLASSPATH based on the release of JDK you are using.
Also, make sure LD_LIBRARY_PATH (Solaris operating system) or PATH (Windows) has $ORACLE_HOME/lib set.
For JDK 1.4.x, the CLASSPATH must contain:
$ORACLE_HOME/jdbc/lib/classes12.jar $ORACLE_HOME/jdbc/lib/ojdbc14.jar $ORACLE_HOME/jlib/jndi.jar $ORACLE_HOME/rdbms/jlib/aqapi14.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar
For JDK 1.3.x, the CLASSPATH must contain:
$ORACLE_HOME/jdbc/lib/classes12.jar $ORACLE_HOME/jlib/jndi.jar $ORACLE_HOME/rdbms/jlib/aqapi13.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar
For JDK 1.2.x, the CLASSPATH must contain:
$ORACLE_HOME/jdbc/lib/classes12.jar $ORACLE_HOME/jlib/jndi.jar $ORACLE_HOME/rdbms/jlib/aqapi12.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar
Step 4 Create Java Classes that Map to the Oracle Object Types
First, create a file input.typ with the following lines:
SQL PERSON AS JPerson SQL ADDRESS AS JAddress
Then, run Jpublisher.
jpub -input=input.typ -user=OE/OE
Completing these actions generates two Java classes named JPerson and JAddress for the person and address types, respectively.
Step 5 Create a Java Code for Enqueuing Messages
This program uses the Oracle JMS API to publish messages into a Oracle Streams topic.
This program does the following:
Publishes a non-LCR based ADT message to the topic
Publishes a JMS text message to a topic
Publish an LCR based message to the topic
import oracle.AQ.*;
import oracle.jms.*;
import javax.jms.*;
import java.lang.*;
import oracle.xdb.*;
public class StreamsEnq
{
public static void main (String args [])
throws java.sql.SQLException, ClassNotFoundException, JMSException
{
TopicConnectionFactory tc_fact= null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
try
{
if (args.length < 3 )
System.out.println("Usage:java filename [SID] [HOST] [PORT]");
else
{
/* Create the TopicConnectionFactory
* Only the JDBC OCI driver can be used to access Oracle Streams through JMS
*/
tc_fact = AQjmsFactory.getTopicConnectionFactory(
args[1], args[0], Integer.parseInt(args[2]), "oci8");
t_conn = tc_fact.createTopicConnection( "OE","OE");
/* Create a TopicSession */
t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
/* Start the connection */
t_conn.start() ;
/* Publish non-LCR based messages */
publishUserMessages(t_sess);
/* Publish LCR based messages */
publishLcrMessages(t_sess);
t_sess.close() ;
t_conn.close() ;
System.out.println("End of StreamsEnq Demo") ;
}
}
catch (Exception ex)
{
System.out.println("Exception-1: " + ex);
ex.printStackTrace();
}
}
/*
* publishUserMessages - this method publishes an ADT message and a
* JMS text message to a Oracle Streams topic
*/
public static void publishUserMessages(TopicSession t_sess) throws Exception
{
Topic topic = null;
TopicPublisher t_pub = null;
JPerson pers = null;
JAddress addr = null;
TextMessage t_msg = null;
AdtMessage adt_msg = null;
AQjmsAgent agent = null;
AQjmsAgent[] recipList = null;
try
{
/* Get the topic */
topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue");
/* Create a publisher */
t_pub = t_sess.createPublisher(topic);
/* Agent to access oe_queue */
agent = new AQjmsAgent("explicit_enq", null);
/* Create a PERSON adt message */
adt_msg = ((AQjmsSession)t_sess).createAdtMessage();
pers = new JPerson();
addr = new JAddress();
addr.setNum(new java.math.BigDecimal(500));
addr.setStreet("Oracle Pkwy");
pers.setName("Mark");
pers.setHome(addr);
/* Set the payload in the message */
adt_msg.setAdtPayload(pers);
((AQjmsMessage)adt_msg).setSenderID(agent);
System.out.println("Publish message 1 -type PERSON\n");
/* Create the recipient list */
recipList = new AQjmsAgent[1];
recipList[0] = new AQjmsAgent("explicit_dq", null);
/* Publish the message */
((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList);
t_sess.commit();
t_msg = t_sess.createTextMessage();
t_msg.setText("Test message");
t_msg.setStringProperty("color", "BLUE");
t_msg.setIntProperty("year", 1999);
((AQjmsMessage)t_msg).setSenderID(agent);
System.out.println("Publish message 2 -type JMS TextMessage\n");
/* Publish the message */
((AQjmsTopicPublisher)t_pub).publish(topic, t_msg, recipList);
t_sess.commit();
}
catch (JMSException jms_ex)
{
System.out.println("JMS Exception: " + jms_ex);
if(jms_ex.getLinkedException() != null)
System.out.println("Linked Exception: " + jms_ex.getLinkedException());
}
}
/*
* publishLcrMessages - this method publishes an XML LCR message to a
* Oracle Streams topic
*/
public static void publishLcrMessages(TopicSession t_sess) throws Exception
{
Topic topic = null;
TopicPublisher t_pub = null;
XMLType xml_lcr = null;
AdtMessage adt_msg = null;
AQjmsAgent agent = null;
StringBuffer lcr_data = null;
AQjmsAgent[] recipList = null;
java.sql.Connection db_conn = null;
try
{
/* Get the topic */
topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue");
/* Create a publisher */
t_pub = t_sess.createPublisher(topic);
/* Get the JDBC connection */
db_conn = ((AQjmsSession)t_sess).getDBConnection();
/* Agent to access oe_queue */
agent = new AQjmsAgent("explicit_enq", null);
/* Create a adt message */
adt_msg = ((AQjmsSession)t_sess).createAdtMessage();
/* Create the LCR representation in XML */
lcr_data = new StringBuffer();
lcr_data.append("<ROW_LCR ");
lcr_data.append("xmlns='http://xmlns.oracle.com/streams/schemas/lcr' \n");
lcr_data.append("xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' \n");
lcr_data.append("xsi:schemaLocation='http://xmlns.oracle.com/streams/schemas/lcr
http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd'");
lcr_data.append("> \n");
lcr_data.append("<source_database_name>source_dbname</source_database_name> \n");
lcr_data.append("<command_type>INSERT</command_type> \n");
lcr_data.append("<object_owner>Ram</object_owner> \n");
lcr_data.append("<object_name>Emp</object_name> \n");
lcr_data.append("<tag>0ABC</tag> \n");
lcr_data.append("<transaction_id>0.0.0</transaction_id> \n");
lcr_data.append("<scn>0</scn> \n");
lcr_data.append("<old_values> \n");
lcr_data.append("<old_value> \n");
lcr_data.append("<column_name>C01</column_name> \n");
lcr_data.append("<data><varchar2>Clob old</varchar2></data> \n");
lcr_data.append("</old_value> \n");
lcr_data.append("<old_value> \n");
lcr_data.append("<column_name>C02</column_name> \n");
lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n");
lcr_data.append("</old_value> \n");
lcr_data.append("<old_value> \n");
lcr_data.append("<column_name>C03</column_name> \n");
lcr_data.append("<data> \n");
lcr_data.append("<date><value>1997-11-24</value><format>SYYYY-MM-DD</format></date> \n");
lcr_data.append("</data> \n");
lcr_data.append("</old_value> \n");
lcr_data.append("<old_value> \n");
lcr_data.append("<column_name>C04</column_name> \n");
lcr_data.append("<data> \n");
lcr_data.append("<timestamp><value>1999-05-31T13:20:00.000</value>
<format>SYYYY-MM-DD'T'HH24:MI:SS.FF</format></timestamp> \n");
lcr_data.append("</data> \n");
lcr_data.append("</old_value> \n");
lcr_data.append("<old_value> \n");
lcr_data.append("<column_name>C05</column_name> \n");
lcr_data.append("<data><raw>ABCDE</raw></data> \n");
lcr_data.append("</old_value> \n");
lcr_data.append("</old_values> \n");
lcr_data.append("<new_values> \n");
lcr_data.append("<new_value> \n");
lcr_data.append("<column_name>C01</column_name> \n");
lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n");
lcr_data.append("</new_value> \n");
lcr_data.append("<new_value> \n");
lcr_data.append("<column_name>C02</column_name> \n");
lcr_data.append("<data><number>35.23</number></data> \n");
lcr_data.append("</new_value> \n");
lcr_data.append("<new_value> \n");
lcr_data.append("<column_name>C03</column_name> \n");
lcr_data.append("<data><number>-100000</number></data> \n");
lcr_data.append("</new_value> \n");
lcr_data.append("<new_value> \n");
lcr_data.append("<column_name>C04</column_name> \n");
lcr_data.append("<data><varchar>Hel lo</varchar></data> \n");
lcr_data.append("</new_value> \n");
lcr_data.append("<new_value> \n");
lcr_data.append("<column_name>C05</column_name> \n");
lcr_data.append("<data><char>wor ld</char></data> \n");
lcr_data.append("</new_value> \n");
lcr_data.append("</new_values> \n");
lcr_data.append("</ROW_LCR>");
/* Create the XMLType containing the LCR */
xml_lcr = oracle.xdb.XMLType.createXML(db_conn, lcr_data.toString());
/* Set the payload in the message */
adt_msg.setAdtPayload(xml_lcr);
((AQjmsMessage)adt_msg).setSenderID(agent);
System.out.println("Publish message 3 - XMLType containing LCR ROW\n");
/* Create the recipient list */
recipList = new AQjmsAgent[1];
recipList[0] = new AQjmsAgent("explicit_dq", null);
/* Publish the message */
((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList);
t_sess.commit();
}
catch (JMSException jms_ex)
{
System.out.println("JMS Exception: " + jms_ex);
if(jms_ex.getLinkedException() != null)
System.out.println("Linked Exception: " + jms_ex.getLinkedException());
}
}
}
Step 6 Create a Java Code for Dequeuing Messages
This program uses Oracle JMS API to receive messages from a Oracle Streams topic.
This program does the following:
Registers mappings for person, address and XMLType in JMS typemap
Receives LCR messages from a Oracle Streams topic
Receives user ADT messages from a Oracle Streams topic
import oracle.AQ.*;
import oracle.jms.*;
import javax.jms.*;
import java.lang.*;
import oracle.xdb.*;
import java.sql.SQLException;
public class StreamsDeq
{
public static void main (String args [])
throws java.sql.SQLException, ClassNotFoundException, JMSException
{
TopicConnectionFactory tc_fact= null;
TopicConnection t_conn = null;
TopicSession t_sess = null;
try
{
if (args.length < 3 )
System.out.println("Usage:java filename [SID] [HOST] [PORT]");
else
{
/* Create the TopicConnectionFactory
* Only the JDBC OCI driver can be used to access Oracle Streams through JMS
*/
tc_fact = AQjmsFactory.getTopicConnectionFactory(
args[1], args[0], Integer.parseInt(args[2]), "oci8");
t_conn = tc_fact.createTopicConnection( "OE","OE");
/* Create a TopicSession */
t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE);
/* Start the connection */
t_conn.start() ;
receiveMessages(t_sess);
t_sess.close() ;
t_conn.close() ;
System.out.println("\nEnd of StreamsDeq Demo") ;
}
}
catch (Exception ex)
{
System.out.println("Exception-1: " + ex);
ex.printStackTrace();
}
}
/*
* receiveMessages -This method receives messages from the SYS.AnyData queue
*/
public static void receiveMessages(TopicSession t_sess) throws Exception
{
Topic topic = null;
JPerson pers = null;
JAddress addr = null;
XMLType xtype = null;
TextMessage t_msg = null;
AdtMessage adt_msg = null;
Message jms_msg = null;
TopicReceiver t_recv = null;
int i = 0;
java.util.Map map= null;
try
{
/* Get the topic */
topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue");
/* Create a TopicReceiver to receive messages for consumer "jms_recv */
t_recv = ((AQjmsSession)t_sess).createTopicReceiver(topic,
"jms_recv", null);
map = ((AQjmsSession)t_sess).getTypeMap();
/* Register mappings for ADDRESS and PERSON in the JMS typemap */
map.put("OE.PERSON", Class.forName("JPerson"));
map.put("OE.ADDRESS", Class.forName("JAddress"));
/* Register mapping for XMLType in the TypeMap - required for LCRs */
map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLTypeFactory"));
System.out.println("Receive messages ...\n");
do
{
try
{
jms_msg = (t_recv.receive(10));
i++;
/* Set navigation mode to NEXT_MESSAGE */
((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_MESSAGE);
}
catch (JMSException jms_ex2)
{
if((jms_ex2.getLinkedException() != null) &&
(jms_ex2.getLinkedException() instanceof SQLException))
{
SQLException sql_ex2 =(SQLException)(jms_ex2.getLinkedException());
/* End of current transaction group
* Use NEXT_TRANSACTION navigation mode
*/
if(sql_ex2.getErrorCode() == 25235)
{
((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_TRANSACTION);
continue;
}
else
throw jms_ex2;
}
else
throw jms_ex2;
}
if(jms_msg == null)
{
System.out.println("\nNo more messages");
}
else
{
if(jms_msg instanceof AdtMessage)
{
adt_msg = (AdtMessage)jms_msg;
System.out.println("Retrieved message " + i + ": " +
adt_msg.getAdtPayload());
if(adt_msg.getAdtPayload() instanceof JPerson)
{
pers =(JPerson)( adt_msg.getAdtPayload());
System.out.println("PERSON: Name: " + pers.getName());
}
else if(adt_msg.getAdtPayload() instanceof JAddress)
{
addr =(JAddress)( adt_msg.getAdtPayload());
System.out.println("ADDRESS: Street" + addr.getStreet());
}
else if(adt_msg.getAdtPayload() instanceof oracle.xdb.XMLType)
{
xtype = (XMLType)adt_msg.getAdtPayload();
System.out.println("XMLType: Data: \n" + xtype.getStringVal());
}
System.out.println("Msg id: " + adt_msg.getJMSMessageID());
System.out.println();
}
else if(jms_msg instanceof TextMessage)
{
t_msg = (TextMessage)jms_msg;
System.out.println("Retrieved message " + i + ": " +
t_msg.getText());
System.out.println("Msg id: " + t_msg.getJMSMessageID());
System.out.println();
}
else
System.out.println("Invalid message type");
}
} while (jms_msg != null);
t_sess.commit();
}
catch (JMSException jms_ex)
{
System.out.println("JMS Exception: " + jms_ex);
if(jms_ex.getLinkedException() != null)
System.out.println("Linked Exception: " + jms_ex.getLinkedException());
t_sess.rollback();
}
catch (java.sql.SQLException sql_ex)
{
System.out.println("SQL Exception: " + sql_ex);
sql_ex.printStackTrace();
t_sess.rollback();
}
}
}
Step 7 Compile the Scripts
javac StreamsEnq.java StreamsDeq.java JPerson.java JAddress.java
Step 8 Run the Enqueue Program
java StreamsEnq ORACLE_SID HOST PORT
For example, if your Oracle SID is orc182, your host is hq_server, and your port is 1521, then enter the following:
java StreamsEnq orcl82 hq_server 1521
Step 9 Run the Dequeue Program
java StreamsDeq ORACLE_SID HOST PORT
For example, if your Oracle SID is orc182, your host is hq_server, and your port is 1520, then enter the following:
java StreamsDeq orcl82 hq_server 1521