|
|
|
| CREATE TABLE
"SCOTT"."TOTO" ( "TOTO_N" NUMBER, "TOTO_C" VARCHAR2(50), "TOTO_TS" TIMESTAMP (6) WITH LOCAL TIME ZONE DEFAULT systimestamp ) ALTER TABLE "SCOTT"."TOTO" ADD PRIMARY KEY
("TOTO_N") |
We will create a capture process on the table to enqueue LCR into a queue table of type ANY_DATA. On this queue table, we will define an apply process that will dequeue each LCR and call a DML_HANDLER to analyze the type of LCR, build a message and enqueue this message into an MQ type queue. You may wonder why we need an apply process, since we could perfectly attach a transform function on a capture process and code our enqueue process to MQ immediately into this transformation rule and then discard the captured LCR. The answer is short : memory. if we do this, the capture process, at each restart will not have the memory of what he has already done. It is the apply process that handles rows in 'LOGMNR_RESTART_CKPT$' In effect it is the apply process that drives the memory of the replication and he alone signal to the capture process when an SCN is completely done. Capture process then advance some counters like FIRST_SCN or START_SCN when all scn below these have been processed. If we define only a capture process, we will never advance anything and each restart, all SCN above FIRST_SCN will be resent. Worse than a mere trigger in fact.
So we need an apply process to manage the memory of what has been done, not been done, not yet been done and this require an queue table. Since we deal with LCR, our queue table will be of type ANY_DATA. On this apply process, we will attach a DML_HANDLER to extract the PK value and one column data form the LCR and build the message text. Also we will request from the DML_HANDLER to perform the enqueue operation into the MQseries queue.
The last problem we face is what to do with the LCR once it has been processed. Since the Capture and the apply are on the same queue table, this would mean that we will apply on the table TOTO the row we just capture on table TOTO? In fact we need to do nothing, not event issue a command to discard the LCR. Whenever you define a DML_HANDLER on a apply process, all rows are dequeued automatically by the apply reader (say consumed) and passed to the DML_HANDLER. The rows will go the the coordinator apply process if and only if you issue the LCR.EXECUTE() within the DML_HANDLER, otherwise the LCR is just discarded. Here there is a major discrepancy in the default behavior : In the absence of a DML_HANDLER, all rows pass automatically from 'reader' to 'coordinator' apply process. On the contrary, whenever a DML_HANDLER is defined, rows will be passed only if an LCR.EXECUTE exists into the DML_HANDLER.
|
Summary:
| 1) Capture
read TOTO Table and enqueue row mutations into an LCR queue 2) Apply Process dequeue LCR, update counters, call DML HANDLER 3) DML HANDLER, decode the LCR, generated MGW type row and enqueue 4) Apply Process purge LCR |
We will use 'MQUSER' as streams administrator
| connect sys/sys as sysdba grant AQ_ADMINISTRATOR_ROLE to mquser ; grant execute on dbms_capture_adm to mquser ; grant SELECT_CATALOG_ROLE to mquser ; grant SELECT on dba_capture to mquser ; grant SELECT on dba_rulesets to mquser ; exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'mquser', grant_privileges => true); |
From now on we give extract of the setup script. every relevant part of the script will be commented. We may retrieve the the full script Here .
At the start of our setup, we declare some variables that will hold the various objects names:
| connect mquser/mquser set serveroutput on size unlimited WHENEVER SQLERROR CONTINUE; declare
v_mq_queue_name
varchar2(60):='mq_q_toto' ; /* MQ series
queue name */ |
We creates the LCR queue and queue table. The package 'DBMS_STREAMS_ADM.setup_queue' creates both in one command. However, keep in mind that using DBMS_STREAMS_ADM restrict the queue_table to be of type ANY_DATA. Now it is fine for us to as this queue table it will hold the captured LCR of table TOTO, but it wont do it for MQseries queue table.
| -- create LCR queue name and queue table of type ANY_DATA DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => v_lcr_queue_table_name, queue_name => v_lcr_queue_name, queue_user => v_owner); |
Now the capture. Again, DBMS_STREAMS_ADM can generated a capture rule and assign it to a generated rule_set in one command:
| dbms_streams_adm.ADD_TABLE_RULES( table_name
=> 'SCOTT.TOTO', streams_type => 'capture', streams_name => v_capture_name, queue_name => v_owner||'.'||v_lcr_queue_name, include_dml => true, include_ddl => false, dml_rule_name => v_dml_rule_name, ddl_rule_name => v_ddl_rule_name, include_tagged_lcr => true, inclusion_rule => true); |
we create the apply process that will dequeue the LCR captured. The apply process will have a DML handler which will perform the MQseries enqueue.
| dbms_streams_adm.ADD_TABLE_RULES( table_name
=> 'SCOTT.TOTO', streams_type => 'apply', streams_name => v_apply_name, queue_name => v_owner||'.'||v_lcr_queue_name, include_dml => true, include_ddl => false, include_tagged_lcr => true, inclusion_rule => true, dml_rule_name => v_dml_rule_name, ddl_rule_name => v_ddl_rule_name); |
Now we declare the DML handler to point toward a procedure we wrote. This procedure will process each row dequeued:
| dbms_apply_adm.set_dml_handler
( object_name => 'SCOTT.TOTO', object_type => 'TABLE', operation_name => 'INSERT', error_handler => FALSE, user_procedure => v_owner||'.cv_toto_to_str' , /* follow the link for the code of the procedure */ apply_database_link => NULL, apply_name => v_apply_name, assemble_lobs => FALSE); |
Query of dba_queues and dba_queue_tables gives :
| Select * from dba_ queue_ where name = 'LCR_Q_TOTO':
select
|
The MQ series queue must of of type SYS.MGW_BASIC_MSG_T :
So we can't use 'DBMS_STREAMS_ADM'. We will use DBMS_AQ_ADM and set the type of the queue table manually:
dbms_aqadm.create_queue_table( queue_table => v_mq_queue_table_name, queue_payload_type => 'sys.mgw_basic_msg_t', multiple_consumers => TRUE); dbms_aqadm.create_queue( queue_name => v_mq_queue_name, queue_table => v_mq_queue_table_name, max_retries => 1000); |
We start now the queues
| dbms_capture_adm.start_capture
(v_capture_name) ; dbms_apply_adm.start_apply (v_apply_name) ; |
At this stage, it is already working. Let's have a closer look to the DML handler for it is the where everything is happening. But before, please note that I have added a log table into the procedure. It was just to demonstrate how to create from scratch an LCR into a DML Handler, build a row and have it inserted into a table. For the current exercise, the message we will build for MQseries will be also inserted into this log table. We will also be able to check through the content of this table what row are processed by the DML handler. So for the moment you need to create this small table:
CREATE TABLE "SCOTT"."MQ_Q_LOG"
( "MSG" VARCHAR2(512)
)
The structure ANYDATA is the main structure of the streams. It contains 98 method to set, get and convert values. It important to understand that what contain a table_queue of type ANYDATA for a standard Streams capture. The overall picture of container cascading for a queue ANYDATA is like ( LCR$_ROW_RECORD ( LCR$_ROW_LIST ( LCR$_ROW_UNIT( ANYDATA ) ) ) ) or in picture:
|
now lets parse a bit through the code of the DML_HANDLER: (follow the link for full code
| CREATE OR REPLACE procedure mquser.cv_toto_to_str(in_any IN ANYDATA) IS |
The following section is just the definition and type of all variable needed. Explanation on the uses of these will come later when we will actually use them
|
Procedure mquser.cv_toto_to_str(in_any IN ANYDATA) |
Comments |
| lcr
SYS.LCR$_ROW_RECORD; lcr_out SYS.LCR$_ROW_RECORD; col1 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; oldvals SYS.LCR$_ROW_LIST; payload sys.mgw_basic_msg_t; header sys.mgw_name_value_array_t; text_body sys.mgw_text_value_t; OldCols VARCHAR2(2000):=''; a ANYDATA; rc NUMBER; t PLS_INTEGER; var number:=-1; v_varchar2_512 varchar2(512):='hello world'; f2 varchar2(50); enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; msgid RAW(16); BEGIN
-- header := sys.mgw_name_value_array_t(
message_properties.priority:=10; |
We loop into each column and concatenated the columns names comma separated into one string Method to extract from LCR.NEW_VALUE the column named TOTO_N. The extract is of type ANYDATA. We then extract the value of the column using the ANYDATA get function that convert the payload of ANYDATA into a typed value.
This is purely optional: We build an LCR row for the table 'MQ_Q_LOG', fill the columns of the row (here only one), define our LCR of type insert and most important perform the LCR.EXECUTE() function (the Boolean argument of LCR.EXECUTE is only to avoid conflict resolution method if they exists). .
We
use the implicit conversion of message properties to set the message
priority for MQ to 0. See : http://download.oracle.com/docs/cd/B19306_01/server.102/b14257/mg_trnsfm.htm#BABFCEIH
Now
we enqueue the message into our MQseries queue. The name of this queue is
MQ_Q_TOTO. |
MEMBER FUNCTION GET_SOURCE_DATABASE_NAME RETURNS VARCHAR2
MEMBER FUNCTION GET_COMMAND_TYPE RETURNS VARCHAR2
MEMBER FUNCTION GET_OBJECT_OWNER RETURNS VARCHAR2
MEMBER FUNCTION GET_OBJECT_NAME RETURNS VARCHAR2
MEMBER FUNCTION GET_VALUE RETURNS ANYDATA
MEMBER FUNCTION GET_LONG_INFORMATION RETURNS NUMBER
MEMBER FUNCTION GET_LOB_INFORMATION RETURNS NUMBER
MEMBER FUNCTION GET_LOB_OFFSET RETURNS NUMBER
MEMBER FUNCTION GET_LOB_OPERATION_SIZE RETURNS NUMBER
MEMBER FUNCTION GET_VALUES RETURNS LCR$_ROW_LIST
MEMBER FUNCTION GET_TRANSACTION_ID RETURNS VARCHAR2
MEMBER FUNCTION GET_SCN RETURNS NUMBER
MEMBER FUNCTION GET_TAG RETURNS RAW
MEMBER FUNCTION GET_COMMIT_SCN RETURNS NUMBER
MEMBER FUNCTION GET_EXTRA_ATTRIBUTE RETURNS ANYDATA
MEMBER FUNCTION GET_COMPATIBLE RETURNS NUMBER
MEMBER FUNCTION GET_SOURCE_TIME RETURNS DATE
Copyright 2008 B.Polarski