/*
connect sys/sys as sysdba
grant AQ_ADMINISTRATOR_ROLE to mquser ;
grant execute on dbms_capture_adm to mquser ;
grant SELECT_CATALOG_ROLE to mquser ;
grant SELECT on dba_capture to mquser ;
grant SELECT on dba_rulesets to mquser ;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'mquser', grant_privileges => true);
*/
connect mquser/mquser
set serveroutput on size unlimited
WHENEVER SQLERROR CONTINUE;
declare
v_mq_queue_name varchar2(60):='mq_q_toto' ;
v_mq_queue_table_name varchar2(60):='mq_qt_toto' ;
v_lcr_queue_name varchar2(60):='lcr_q_toto' ;
v_lcr_queue_table_name varchar2(60):='lcr_qt_toto' ;
v_capture_name varchar2(60):='cap_lcr_toto' ;
v_apply_name varchar2(60):='app_lcr_toto' ;
v_owner varchar2(60):='mquser' ;
cpt number:=0;
v_dml_rule_name varchar2(30);
v_ddl_rule_name varchar2(30);
subscriber sys.aq$_agent;
begin
-- =============================================================================================================================================
-- Start some cleaning
-- =============================================================================================================================================
-- stop mq queue
select count(*) into cpt from dba_queues where upper(name) = upper(v_mq_queue_name);
if ( cpt > 0 ) then
dbms_output.put_line('Stop MQ queue '|| v_mq_queue_name);
dbms_aqadm.stop_queue( v_mq_queue_name );
end if;
-- drop capture
select count(*) into cpt from dba_capture where upper(capture_name) = upper(v_capture_name);
if ( cpt > 0 ) then
dbms_output.put_line('Drop capture '||v_capture_name);
dbms_capture_adm.stop_capture( v_capture_name );
dbms_capture_adm.drop_capture( v_capture_name,TRUE );
end if;
-- drop apply
select count(*) into cpt from dba_apply where upper(apply_name) = upper(v_apply_name);
if ( cpt > 0 ) then
dbms_output.put_line('Drop apply '||v_apply_name);
dbms_apply_adm.stop_apply( v_apply_name );
dbms_apply_adm.drop_apply( v_apply_name ,TRUE); -- True is for rulesets and rule
end if;
-- drop mq queue
select count(*) into cpt from dba_queues where upper(name) = upper( v_mq_queue_name);
if ( cpt > 0 ) then
dbms_output.put_line('MQ :dbms_aqadm.drop_queue(' || v_mq_queue_name ||');' );
dbms_aqadm.stop_queue(v_mq_queue_name);
dbms_aqadm.drop_queue(v_mq_queue_name);
end if;
-- drop mq queue table
select count(*) into cpt from dba_queue_tables where upper(queue_table) = upper( v_mq_queue_table_name);
if ( cpt > 0 ) then
dbms_output.put_line('1=dbms_aqadm.drop_queue_table(' || v_mq_queue_table_name ||')'';' );
dbms_aqadm.drop_queue_table(v_owner||'.'||v_mq_queue_table_name);
end if;
-- drop transformation rules if exists
select count(*) into cpt from dba_rules where upper(rule_name) = 'MQ_RN_TOTO';
if ( cpt > 0 ) then
dbms_output.put_line('drop rule MQ_RN_TOTO');
dbms_rule_adm.drop_rule( v_owner||'.MQ_RN_TOTO', TRUE );
end if;
-- drop lcr queue
select count(*) into cpt from dba_queues where upper(name) = upper( v_lcr_queue_name);
if ( cpt > 0 ) then
dbms_output.put_line('LCR :dbms_aqadm.drop_queue(' || v_lcr_queue_name ||');' );
dbms_aqadm.stop_queue(v_owner||'.'||v_lcr_queue_name, TRUE, TRUE, FALSE);
dbms_aqadm.drop_queue(v_lcr_queue_name);
end if;
-- drop LCR queue table of type any_data
select count(*) into cpt from dba_queue_tables where upper(queue_table) = upper( v_lcr_queue_table_name);
if ( cpt > 0 ) then
dbms_output.put_line('LCR : dbms_aqadm.drop_queue_table(' || v_lcr_queue_table_name ||')'';' );
dbms_aqadm.drop_queue_table(v_owner||'.'||v_lcr_queue_table_name);
end if;
-- =============================================================================================================================================
-- Start of creation
-- =============================================================================================================================================
-- Create mq queue table of type sys.mgw_basic_msg_t
dbms_output.put_line('Create queue table '||v_mq_queue_table_name);
dbms_aqadm.create_queue_table( queue_table => v_mq_queue_table_name, queue_payload_type => 'sys.mgw_basic_msg_t', multiple_consumers => TRUE);
-- Create mq queue
dbms_output.put_line('Create queue '||v_mq_queue_name);
dbms_aqadm.create_queue(queue_name =>v_mq_queue_name, queue_table => v_mq_queue_table_name, max_retries => 1000);
-- Starting mq queue
dbms_output.put_line('Starting Queue ');
dbms_aqadm.start_queue(v_mq_queue_name);
-- create LCR queue name and queue table of type ANY_DATA
DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => v_lcr_queue_table_name, queue_name => v_lcr_queue_name, queue_user => v_owner);
-- create capture
dbms_output.put_line('Create capture '|| v_capture_name);
dbms_streams_adm.ADD_TABLE_RULES( table_name => 'WLPINT.TOTO', streams_type => 'capture',
streams_name => v_capture_name, queue_name => v_owner||'.'||v_lcr_queue_name,
include_dml => true, include_ddl => false, dml_rule_name=> v_dml_rule_name, ddl_rule_name=> v_ddl_rule_name,
include_tagged_lcr => true, inclusion_rule => true);
-- create apply process that will read and empty LCR QUEUE and enqueue and call a procedure
dbms_streams_adm.ADD_TABLE_RULES( table_name => 'WLPINT.TOTO', streams_type => 'apply',
streams_name => v_apply_name, queue_name => v_owner||'.'||v_lcr_queue_name,
include_dml => true, include_ddl => false, include_tagged_lcr => true,
inclusion_rule => true, dml_rule_name => v_dml_rule_name, ddl_rule_name => v_ddl_rule_name);
dbms_output.put_line('rule_name:='||v_dml_rule_name);
-- dbms_streams_adm.set_rule_transform_function(rule_name => v_dml_rule_name, transform_function =>v_owner|| '.cv_toto_to_str');
dbms_apply_adm.set_dml_handler( object_name => 'WLPINT.TOTO', object_type => 'TABLE', operation_name => 'INSERT',
error_handler => FALSE, user_procedure => v_owner|| '.cv_toto_to_str' , apply_database_link => NULL,
apply_name => v_apply_name, assemble_lobs => FALSE);
-- start queues
dbms_capture_adm.start_capture(v_capture_name) ;
dbms_apply_adm.start_apply(v_apply_name) ;
end;
/
CREATE OR REPLACE procedure mquser.cv_toto_to_str(in_any IN ANYDATA)
IS
lcr SYS.LCR$_ROW_RECORD;
lcr_out SYS.LCR$_ROW_RECORD;
col1 SYS.LCR$_ROW_UNIT;
newvals SYS.LCR$_ROW_LIST;
oldvals SYS.LCR$_ROW_LIST;
OldCols VARCHAR2(2000):='';
a ANYDATA;
rc NUMBER;
t PLS_INTEGER;
var number:=-1;
v_varchar2_512 varchar2(512):='hello world';
f2 varchar2(50);
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
msgid RAW(16);
payload sys.mgw_basic_msg_t;
header sys.mgw_name_value_array_t;
text_body sys.mgw_text_value_t;
-- recipients dbms_aq.aq$_recipient_list_t;
BEGIN
IF in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
rc := in_any.GETOBJECT(lcr);
oldvals:=lcr.get_values('NEW');
FOR i IN 1..oldvals.Count LOOP
OldCols:= OldCols||oldvals(i).column_name||',';
END LOOP;
begin
a:=lcr.get_value('NEW','TOTO_N');
t := a.GETNUMBER(var) ;
a:=lcr.get_value('NEW','TOTO_C');
t := a.GETVARCHAR2(f2) ;
exception
when others then
var:=-1;
end ;
v_varchar2_512:=to_char(sysdate)|| ' dwlpdb02 '||' Oracle - MSG ' || to_char(var) || 'Hello world : ' || lcr.get_object_name()|| ' f2='||f2||' cols='||OldCols;
col1:= SYS.LCR$_ROW_UNIT( 'MSG', SYS.AnyData.ConvertVarChar2(v_varchar2_512), DBMS_LCR.NOT_A_LOB, NULL, NULL);
newvals := SYS.LCR$_ROW_LIST(col1);
lcr_out := SYS.LCR$_ROW_RECORD.CONSTRUCT(
source_database_name => lcr.GET_SOURCE_DATABASE_NAME(),
command_type => 'INSERT',
object_owner => 'WLPINT',
object_name => 'MQ_Q_LOG',
old_values => null,
new_values => newvals);
lcr_out.execute(false);
commit ;
----- MSG MQ now
-- header := sys.mgw_name_value_array_t(
-- sys.mgw_name_value_t.construct_integer('MGW_MQ_characterSet', '1208'),
-- sys.mgw_name_value_t.construct_integer('MGW_MQ_priority', '7')
-- );
header := sys.mgw_name_value_array_t(
sys.mgw_name_value_t.construct_integer('MGW_MQ_priority ', '0')
);
-- recipients(1) := sys.aq$_agent ('mquser',null,null);
-- message_properties.recipient_list := recipients;
message_properties.priority:=10;
-- Create the text body.
text_body := sys.mgw_text_value_t(v_varchar2_512, NULL);
-- Construct the message from its constituent parts.
payload := sys.mgw_basic_msg_t(header, text_body, NULL);
-- Enqueue the message.
dbms_aq.enqueue(queue_name => 'mquser.mq_q_toto',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => payload,
msgid => msgid);
commit;
END IF;
END;
/
@mq_adm_cleanup.sql
@mq_adm_setup.sql