Streams to MQ: Part 2

Home Up

 


PART II: Configuration AQ to MQ messages

  1. Presentation of the table and goal of the exercise
  2. Architecture of capture-to-MQ process
  3. Master setup script
    1. Some variables
    2. create LCR queue
    3. create MQseries queue
    4. Start the queues
  4. Cascading structure within ANYDATA queue table 
  5. Walking through the DML_HANDLER
  6. Structures reference

Presentation of the table and goal of the exercise

    In this second part, we will now describe the steps to capture changes into an Oracle table in order to send a message to MQ for each change we detected. This is not the same thing as a table replication since the target is not a remote table but a message queue. Typically you encounter this in an alarming system. In the case where you have to transfer contents from an Oracle table to an heterogeneous target Database, most of the techniques describe here are the same. You will probably then use 'jms XML' instead of 'basic java jms' as xml is best suited to describe a row layout

The following table will have to be scanned and for each message we will  send a message comprising the PK of the table, and one column data. We will send message only form DML of type insert and discard update and delete 

CREATE TABLE "SCOTT"."TOTO"
   ( "TOTO_N" NUMBER,
     "TOTO_C" VARCHAR2(50),
     "TOTO_TS" TIMESTAMP (6) WITH LOCAL TIME ZONE DEFAULT systimestamp
   )

ALTER TABLE "SCOTT"."TOTO" ADD PRIMARY KEY ("TOTO_N")
ALTER TABLE "SCOTT"."TOTO" ADD SUPPLEMENTAL LOG GROUP "TOTO_LG_PK" ("TOTO_N") ALWAYS
ALTER TABLE "SCOTT"."TOTO" ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS
ALTER TABLE "SCOTT"."TOTO" ADD SUPPLEMENTAL LOG DATA (UNIQUE INDEX) COLUMNS
ALTER TABLE "SCOTT"."TOTO" ADD SUPPLEMENTAL LOG DATA (FOREIGN KEY) COLUMNS

CREATE UNIQUE INDEX "SCOTT"."SYS_C0030260" ON "SCOTT"."TOTO" ("TOTO_N")


Architecture of capture-to-MQ process

    We will create a capture process on the table to  enqueue LCR into a queue table of type ANY_DATA. On this queue table,  we will define an apply process that will dequeue each LCR and call a DML_HANDLER to analyze the type of LCR, build a message and enqueue this message into an MQ type queue.  You may wonder  why we need an apply process, since we could perfectly attach a transform function on a capture process and code our enqueue process to MQ immediately into this transformation rule and then discard the captured LCR.  The answer is short : memory. if we do this, the capture process, at each restart will not have the memory of what he has already done. It is the apply process that handles rows in 'LOGMNR_RESTART_CKPT$' In effect it is the apply process that drives the memory of the replication and he alone signal to the capture process when an SCN is completely done. Capture process  then advance some counters like FIRST_SCN or START_SCN when all scn below these have been processed. If we define only a capture process, we will never advance anything and each restart, all  SCN above FIRST_SCN will be resent. Worse than a mere trigger in fact.

    So we need an apply process to manage the memory of what has been done, not been done, not yet been done and this require an queue table. Since we deal with LCR, our queue table will be of type ANY_DATA. On this apply process, we will attach a DML_HANDLER to extract the PK value and one column data form the LCR and build the message text. Also we will request from the DML_HANDLER to perform the enqueue operation into the MQseries queue.

  The last problem we face is what to do with the LCR once it has been processed. Since the Capture and the apply are on the same queue table, this would mean that we will apply on the table TOTO the row we just capture on table TOTO? In fact we need to do nothing, not event issue a command to discard the LCR. Whenever you define a  DML_HANDLER on a apply process, all rows are dequeued automatically by the apply reader  (say consumed) and passed to the DML_HANDLER. The rows will go the the coordinator apply process if and only if you issue the LCR.EXECUTE() within the DML_HANDLER, otherwise the LCR is just discarded.  Here there is a major discrepancy in the default behavior : In the absence of a DML_HANDLER, all rows pass automatically from 'reader' to 'coordinator' apply process. On the contrary, whenever a  DML_HANDLER is defined, rows will be passed only if an LCR.EXECUTE exists into the DML_HANDLER.

   Summary:

    1)     Capture read TOTO Table and enqueue row mutations into an LCR queue
    2)     Apply Process dequeue LCR, update counters, call DML HANDLER
    3)     DML HANDLER, decode the LCR, generated MGW type row and enqueue 
    4)     Apply Process purge LCR

 

Master setup script 

We will use 'MQUSER' as streams administrator 

connect sys/sys as sysdba
grant AQ_ADMINISTRATOR_ROLE to mquser ;
grant execute on dbms_capture_adm to mquser ;
grant SELECT_CATALOG_ROLE to mquser ;
grant SELECT on dba_capture to mquser ;
grant SELECT on dba_rulesets to mquser ;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'mquser', grant_privileges => true);

From now on we give extract of the setup script. every relevant part of the script will be commented. We may retrieve the the full script Here .

Some variables

At the start of our setup, we declare some variables that will hold the various objects names:

connect mquser/mquser
set serveroutput on size unlimited
WHENEVER SQLERROR CONTINUE;

declare

v_mq_queue_name            varchar2(60):='mq_q_toto' ;      /* MQ series queue name */
v_mq_queue_table_name      varchar2(60):='mq_qt_toto' ;     /* MQ series queue table name of format SYS.MGW_BASIC_MSG_T */
v_lcr_queue_name           varchar2(60):='lcr_q_toto' ;     /* LCR queue name       */
v_lcr_queue_table_name     varchar2(60):='lcr_qt_toto' ;    /* LCR queue table name */
v_capture_name             varchar2(60):='cap_lcr_toto' ;   /* Streams capture process name  */
v_apply_name               varchar2(60):='app_lcr_toto' ;   /* Streams apply process name    */
v_owner                    varchar2(60):='mquser' ;         /* Streams admin and queue owner */ 
v_dml_rule_name            varchar2(30);                    /* this variable will receive the dml_rule_name during capture setup */
v_ddl_rule_name            varchar2(30);                    /* this variable will receive the ddl rule name during capture setup*/
subscriber                 sys.aq$_agent;                   /* we declare this AQ_ADM structure to set the MSG priority (optional feature)

begin           /* start of the anonymous PL/SQL script */

Create LCR queue

    We creates the LCR queue and queue table. The package 'DBMS_STREAMS_ADM.setup_queue' creates both in one command. However, keep in mind that using DBMS_STREAMS_ADM restrict the queue_table to be of  type ANY_DATA. Now it is fine for us to as this queue table it will hold the captured LCR of table TOTO, but it wont do it for MQseries queue table.

-- create LCR queue name and queue table of type ANY_DATA
DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => v_lcr_queue_table_name, queue_name => v_lcr_queue_name, queue_user => v_owner);

Now the capture. Again,  DBMS_STREAMS_ADM can generated a capture rule and assign it to a generated  rule_set in one command: 

dbms_streams_adm.ADD_TABLE_RULES( table_name          => 'SCOTT.TOTO', 
                                  streams_type        => 'capture',
                                  streams_name        => v_capture_name, 
                                  queue_name          => v_owner||'.'||v_lcr_queue_name,
                                  include_dml         => true, include_ddl => false, 
                                  dml_rule_name       => v_dml_rule_name, 
                                  ddl_rule_name       => v_ddl_rule_name,
                                  include_tagged_lcr  => true, 
                                  inclusion_rule      => true);

we create the apply process that will dequeue the LCR captured. The apply process will have a DML handler which will perform the MQseries enqueue. 

dbms_streams_adm.ADD_TABLE_RULES( table_name          => 'SCOTT.TOTO', 
                                  streams_type        => 'apply',
                                  streams_name        => v_apply_name, 
                                  queue_name          => v_owner||'.'||v_lcr_queue_name,
                                  include_dml         => true, 
                                  include_ddl         => false, include_tagged_lcr => true,
                                  inclusion_rule      => true, dml_rule_name => v_dml_rule_name, ddl_rule_name => v_ddl_rule_name);

Now we declare the DML handler to point toward a procedure we wrote. This procedure will process each row dequeued:

dbms_apply_adm.set_dml_handler ( object_name          => 'SCOTT.TOTO', 
                                 object_type          => 'TABLE', 
                                 operation_name       => 'INSERT',
                                 error_handler        => FALSE, 
                                 user_procedure       => v_owner||'.cv_toto_to_str' ,   /* follow the link for the code of the procedure */
                                 apply_database_link  => NULL,
                                 apply_name           => v_apply_name, 
                                 assemble_lobs        => FALSE);

Query of dba_queues and dba_queue_tables gives :

Select * 
  from  dba_ queue_
  where name = 'LCR_Q_TOTO':


OWNER    NAME       QUEUE_TABLE  QID  QUEUE_TYPE    MAX_RETRIES RETRY_DELAY ENQUEUE  DEQUEUE RETENTION NETWORK_NAME
-------- ---------- ------------ ----- ------------ ----------- ----------- -------  ------- --------  ---------------------------------------
MQUSER   LCR_Q_TOTO LCR_QT_TOTO  87394 NORMAL_QUEUE 5           0           YES      YES     0         SYS$MQUSER.LCR_Q_TOTO.WLINT2.DOMAIN.COM

select 
    OWNER, QUEUE_TABLE, TYPE, OBJECT_TYPE, SORT_ORDER, RECIPIENTS, MESSAGE_GROUPING 
from 
  dba_queue_tables 
where 
   QUEUE_TABLE = 'LCR_QT_TOTO'



OWNER    QUEUE_TABLE  TYPE    OBJECT_TYPE  SORT_ORDER   RECIPIENTS  MESSAGE_GROUPING
-------- ------------ ------  -----------  -----------  ----------  ----------------
MQUSER   LCR_QT_TOTO  OBJECT  SYS.ANYDATA  COMMIT_TIME  MULTIPLE    TRANSACTIONAL

Create MQseries Queue

The MQ series queue must of of type SYS.MGW_BASIC_MSG_T :

So we can't use 'DBMS_STREAMS_ADM'. We will use DBMS_AQ_ADM and set the type of the queue table manually:


dbms_aqadm.create_queue_table( queue_table           => v_mq_queue_table_name, 
                               queue_payload_type    => 'sys.mgw_basic_msg_t', 
                               multiple_consumers    => TRUE);

dbms_aqadm.create_queue(       queue_name            => v_mq_queue_name, 
                               queue_table           => v_mq_queue_table_name, 
                               max_retries           => 1000);

   

Start the queues

We start now the queues

dbms_capture_adm.start_capture  (v_capture_name) ;
dbms_apply_adm.start_apply      (v_apply_name) ;

At this stage, it is already working. Let's have a closer look to the DML handler for it is the where everything is happening. But before, please note that I have added a log table into the procedure. It was just to demonstrate how to create from scratch an LCR into a DML Handler, build a row and have it inserted into a table. For the current exercise, the message we will build for MQseries will be also inserted into this log table. We will also be able to check through the content of this table what row are processed by the DML handler. So for the moment you need to create this small table:

    CREATE TABLE "SCOTT"."MQ_Q_LOG"
                  ( "MSG" VARCHAR2(512)
    )    

Cascading structure within ANYDATA queue table 

The structure ANYDATA is the main structure of the streams. It contains 98 method to set, get and convert values. It important to understand that what contain a table_queue of type ANYDATA for a standard Streams capture. The overall picture of container cascading for a queue  ANYDATA is like ( LCR$_ROW_RECORD ( LCR$_ROW_LIST ( LCR$_ROW_UNIT( ANYDATA ) ) ) )  or in picture:

Walking through the DML_HANDLER 

 now lets parse a bit through the code of the DML_HANDLER: (follow the link for full code

CREATE OR REPLACE procedure mquser.cv_toto_to_str(in_any IN ANYDATA)
IS

The following section is just the definition and type of all variable needed. Explanation on the uses of these will come later when we will actually use them

 Procedure mquser.cv_toto_to_str(in_any IN ANYDATA)

Comments

lcr                        SYS.LCR$_ROW_RECORD;
lcr_out                    SYS.LCR$_ROW_RECORD;
col1                       SYS.LCR$_ROW_UNIT;
newvals                    SYS.LCR$_ROW_LIST;
oldvals                    SYS.LCR$_ROW_LIST;
payload                    sys.mgw_basic_msg_t;
header                     sys.mgw_name_value_array_t;
text_body                  sys.mgw_text_value_t;
OldCols                    VARCHAR2(2000):='';
a                          ANYDATA;
rc                         NUMBER;
t                          PLS_INTEGER;
var                        number:=-1;
v_varchar2_512             varchar2(512):='hello world';
f2                         varchar2(50);
enqueue_options            dbms_aq.enqueue_options_t;
message_properties         dbms_aq.message_properties_t;
msgid                      RAW(16);

BEGIN
  IF in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
  rc := in_any.GETOBJECT(lcr);





  oldvals:=lcr.get_values('NEW');





  FOR i IN 1..oldvals.Count LOOP
      OldCols:= OldCols||oldvals(i).column_name||',';
  END LOOP;
  begin
    a:=lcr.get_value('NEW','TOTO_N');
    t := a.GETNUMBER(var) ;
    a:=lcr.get_value('NEW','TOTO_C');
    t := a.GETVARCHAR2(f2) ;
    exception
       when others then
            var:=-1;
    end ;


v_varchar2_512:=to_char(sysdate)|| ' dwlpdb02 '||' Oracle - MSG ' || to_char(var) || 'Hello world : ' || lcr.get_object_name()|| ' f2='||f2||' cols='||OldCols;
col1:= SYS.LCR$_ROW_UNIT( 'MSG', SYS.AnyData.ConvertVarChar2(v_varchar2_512), DBMS_LCR.NOT_A_LOB, NULL, NULL);

                                      /* if you have  many columns  */ 
newvals := SYS.LCR$_ROW_LIST(col1);   /* write  (col1,col2,col3)    */
lcr_out := SYS.LCR$_ROW_RECORD.CONSTRUCT(
                 source_database_name => lcr.GET_SOURCE_DATABASE_NAME(),
                 command_type         => 'INSERT',
                 object_owner         => 'SCOTT',
                 object_name          => 'MQ_Q_LOG',
                 old_values           => null,
                 new_values           => newvals);
lcr_out.execute(false);
commit ;

----- MSG MQ now

-- header := sys.mgw_name_value_array_t(
-- sys.mgw_name_value_t.construct_integer('MGW_MQ_characterSet', '1208'),
-- sys.mgw_name_value_t.construct_integer('MGW_MQ_priority', '7')
-- );
header := sys.mgw_name_value_array_t(
sys.mgw_name_value_t.construct_integer('MGW_MQ_priority ', '0')
);    /* this does not seems to work */

-- recipients(1) := sys.aq$_agent ('mquser',null,null);
-- message_properties.recipient_list := recipients;

message_properties.priority:=10;

-- Create the text body.
text_body := sys.mgw_text_value_t(v_varchar2_512, NULL);

-- Construct the message from its constituent parts.
payload := sys.mgw_basic_msg_t(header, text_body, NULL);

-- Enqueue the message.
dbms_aq.enqueue(  queue_name         =>'mquser.mq_q_toto',
                  enqueue_options    => enqueue_options,
                  message_properties => message_properties,
                  payload            => payload,
                  msgid              => msgid);
commit;


END IF;
END;
/

















We start by using one of the 98 method of ANYDATA. The payload may be anything but at the encode time the type name was set to the string 'SYS.LCR$_ROW_RECORD' which is a DML for a single row. other commons value in Streams returned by 'anydata.gettypname' are 'SYS.LCR$_DDL_RECORD' for DDL command and 'SYS.LCR$_PROCEDURE_RECORD' for a piece of code.


This single commands effectively loads all columns values. The values are stored in SYS.LCR$_ROW_RECORD.NEW_VALUES which is an object table of SYS.LCR$_ROW_UNIT

 

We loop into each column and concatenated the columns names comma separated into one string 

Method to extract from LCR.NEW_VALUE  the column named TOTO_N. The extract is of type ANYDATA. We then extract the value of the column using the ANYDATA get function that convert the payload of ANYDATA into a typed value. 


We concatenates various stuff to build the message for MQseries. Note that You  could put here a call to a procedure/function/package that will takes in input the PK of the rows (extracted using get_value) and retrieve a result you convert to varchar2 and enqueue.

 

This is purely optional: We build an LCR row for the table 'MQ_Q_LOG', fill the columns of the row (here only one), define our LCR of type insert and most important perform the LCR.EXECUTE() function (the Boolean argument of LCR.EXECUTE is only to avoid conflict resolution method if they exists).

.

 

 

 

 

 

 

We use the implicit conversion of message properties to set the message priority for MQ to 0.
To achieve this we set the AQ properties to something above 9 (will be invalid value) and it will be converted to 0. 

See : http://download.oracle.com/docs/cd/B19306_01/server.102/b14257/mg_trnsfm.htm#BABFCEIH

 

Now we enqueue the message into our MQseries queue. The name of this queue is MQ_Q_TOTO. 
You will find the real MQ definition values into mq_adm_setup.sql

 

 

Structures Reference

List of SYS.LCR$_ROW_RECORD get method 

MEMBER FUNCTION GET_SOURCE_DATABASE_NAME RETURNS VARCHAR2
MEMBER FUNCTION GET_COMMAND_TYPE RETURNS VARCHAR2
MEMBER FUNCTION GET_OBJECT_OWNER RETURNS VARCHAR2
MEMBER FUNCTION GET_OBJECT_NAME RETURNS VARCHAR2
MEMBER FUNCTION GET_VALUE RETURNS ANYDATA
MEMBER FUNCTION GET_LONG_INFORMATION RETURNS NUMBER
MEMBER FUNCTION GET_LOB_INFORMATION RETURNS NUMBER
MEMBER FUNCTION GET_LOB_OFFSET RETURNS NUMBER
MEMBER FUNCTION GET_LOB_OPERATION_SIZE RETURNS NUMBER
MEMBER FUNCTION GET_VALUES RETURNS LCR$_ROW_LIST
MEMBER FUNCTION GET_TRANSACTION_ID RETURNS VARCHAR2
MEMBER FUNCTION GET_SCN RETURNS NUMBER
MEMBER FUNCTION GET_TAG RETURNS RAW
MEMBER FUNCTION GET_COMMIT_SCN RETURNS NUMBER
MEMBER FUNCTION GET_EXTRA_ATTRIBUTE RETURNS ANYDATA
MEMBER FUNCTION GET_COMPATIBLE RETURNS NUMBER
MEMBER FUNCTION GET_SOURCE_TIME RETURNS DATE

List of ANYDATA get method

MEMBER FUNCTION GETTYPENAME RETURNS VARCHAR2
MEMBER FUNCTION GETTYPE RETURNS PLS_INTEGER
MEMBER FUNCTION GETNUMBER RETURNS PLS_INTEGER
MEMBER FUNCTION GETDATE RETURNS PLS_INTEGER
MEMBER FUNCTION GETCHAR RETURNS PLS_INTEGER
MEMBER FUNCTION GETVARCHAR RETURNS PLS_INTEGER
MEMBER FUNCTION GETVARCHAR2 RETURNS PLS_INTEGER
MEMBER FUNCTION GETRAW RETURNS PLS_INTEGER
MEMBER FUNCTION GETBLOB RETURNS PLS_INTEGER
MEMBER FUNCTION GETCLOB RETURNS PLS_INTEGER
MEMBER FUNCTION GETBFILE RETURNS PLS_INTEGER
MEMBER FUNCTION GETOBJECT RETURNS PLS_INTEGER
MEMBER FUNCTION GETOBJECT RETURNS PLS_INTEGER
MEMBER FUNCTION GETREF RETURNS PLS_INTEGER
MEMBER FUNCTION GETCOLLECTION RETURNS PLS_INTEGER
MEMBER FUNCTION GETTIMESTAMP RETURNS PLS_INTEGER
MEMBER FUNCTION GETTIMESTAMPTZ RETURNS PLS_INTEGER
MEMBER FUNCTION GETTIMESTAMPLTZ RETURNS PLS_INTEGER
MEMBER FUNCTION GETINTERVALYM RETURNS PLS_INTEGER
MEMBER FUNCTION GETINTERVALDS RETURNS PLS_INTEGER
MEMBER FUNCTION GETNCHAR RETURNS PLS_INTEGER
MEMBER FUNCTION GETNVARCHAR2 RETURNS PLS_INTEGER
MEMBER FUNCTION GETNCLOB RETURNS PLS_INTEGER
MEMBER FUNCTION GETBFLOAT RETURNS PLS_INTEGER
MEMBER FUNCTION GETBDOUBLE RETURNS PLS_INTEGER

 

 Type : SYS.MGW_BASIC_MSG_T

Name                        Null?    Type
--------------------------- -------- ----------------------------
HEADER                               MGW_NAME_VALUE_ARRAY_T
TEXT_BODY                            MGW_TEXT_VALUE_T
RAW_BODY                             MGW_RAW_VALUE_T

METHOD
------
STATIC FUNCTION CONSTRUCT RETURNS MGW_BASIC_MSG_T

Copyright  2008 B.Polarski