/*
connect sys/sys as sysdba
grant AQ_ADMINISTRATOR_ROLE to mquser ;
grant execute on dbms_capture_adm to mquser ;
grant SELECT_CATALOG_ROLE to mquser ;
grant SELECT on dba_capture to mquser ;
grant SELECT on dba_rulesets to mquser ;
exec DBMS_STREAMS_AUTH.GRANT_ADMIN_PRIVILEGE( grantee => 'mquser', grant_privileges => true);
*/
connect mquser/mquser
set serveroutput on size unlimited
WHENEVER SQLERROR CONTINUE;

declare
   v_mq_queue_name        varchar2(60):='mq_q_toto' ;
   v_mq_queue_table_name  varchar2(60):='mq_qt_toto' ;
   v_lcr_queue_name       varchar2(60):='lcr_q_toto' ;
   v_lcr_queue_table_name varchar2(60):='lcr_qt_toto' ;
   v_capture_name         varchar2(60):='cap_lcr_toto' ;
   v_apply_name           varchar2(60):='app_lcr_toto' ;
   v_owner                varchar2(60):='mquser' ;
   cpt                    number:=0;
   v_dml_rule_name        varchar2(30);
   v_ddl_rule_name        varchar2(30);
   subscriber sys.aq$_agent;


begin
  -- =============================================================================================================================================
  --              Start some cleaning
  -- =============================================================================================================================================

   -- stop mq queue
  select count(*) into cpt from dba_queues where upper(name) = upper(v_mq_queue_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('Stop MQ queue '|| v_mq_queue_name);
     dbms_aqadm.stop_queue( v_mq_queue_name );
  end if;
  -- drop capture
  select count(*) into cpt from dba_capture where upper(capture_name) = upper(v_capture_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('Drop capture '||v_capture_name);
     dbms_capture_adm.stop_capture( v_capture_name );
     dbms_capture_adm.drop_capture( v_capture_name,TRUE );
  end if;
  -- drop apply
  select count(*) into cpt from dba_apply where upper(apply_name) = upper(v_apply_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('Drop apply '||v_apply_name);
     dbms_apply_adm.stop_apply( v_apply_name );
     dbms_apply_adm.drop_apply( v_apply_name ,TRUE);   -- True is for rulesets and rule
  end if;

  -- drop mq queue
  select count(*) into cpt from dba_queues where upper(name) = upper( v_mq_queue_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('MQ :dbms_aqadm.drop_queue(' || v_mq_queue_name ||');' );
     dbms_aqadm.stop_queue(v_mq_queue_name);
     dbms_aqadm.drop_queue(v_mq_queue_name);
  end if;

  -- drop mq queue table
  select count(*) into cpt from dba_queue_tables where upper(queue_table) = upper( v_mq_queue_table_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('1=dbms_aqadm.drop_queue_table(' || v_mq_queue_table_name ||')'';' );
     dbms_aqadm.drop_queue_table(v_owner||'.'||v_mq_queue_table_name);
  end if;

  -- drop transformation rules if exists
  select count(*) into cpt from dba_rules where upper(rule_name) = 'MQ_RN_TOTO';
  if ( cpt > 0 ) then
     dbms_output.put_line('drop rule MQ_RN_TOTO');
     dbms_rule_adm.drop_rule( v_owner||'.MQ_RN_TOTO',  TRUE );
  end if;


  -- drop lcr queue
  select count(*) into cpt from dba_queues where upper(name) = upper( v_lcr_queue_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('LCR :dbms_aqadm.drop_queue(' || v_lcr_queue_name ||');' );
     dbms_aqadm.stop_queue(v_owner||'.'||v_lcr_queue_name, TRUE, TRUE, FALSE);
     dbms_aqadm.drop_queue(v_lcr_queue_name);
  end if;

  -- drop LCR queue table of type any_data
  select count(*) into cpt from dba_queue_tables where upper(queue_table) = upper( v_lcr_queue_table_name);
  if ( cpt > 0 ) then
     dbms_output.put_line('LCR : dbms_aqadm.drop_queue_table(' || v_lcr_queue_table_name ||')'';' );
     dbms_aqadm.drop_queue_table(v_owner||'.'||v_lcr_queue_table_name);
  end if;


  -- =============================================================================================================================================
  --                   Start of creation
  -- =============================================================================================================================================

  --  Create mq queue table of type sys.mgw_basic_msg_t
  dbms_output.put_line('Create queue table  '||v_mq_queue_table_name);
  dbms_aqadm.create_queue_table( queue_table => v_mq_queue_table_name, queue_payload_type => 'sys.mgw_basic_msg_t', multiple_consumers => TRUE);

  --  Create mq queue
  dbms_output.put_line('Create queue  '||v_mq_queue_name);
  dbms_aqadm.create_queue(queue_name =>v_mq_queue_name, queue_table => v_mq_queue_table_name, max_retries => 1000);

  --  Starting mq queue
  dbms_output.put_line('Starting Queue ');
  dbms_aqadm.start_queue(v_mq_queue_name);

  -- create LCR queue name and queue table of type ANY_DATA
  DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => v_lcr_queue_table_name, queue_name  => v_lcr_queue_name, queue_user  => v_owner);

  -- create capture
  dbms_output.put_line('Create capture '|| v_capture_name);
  dbms_streams_adm.ADD_TABLE_RULES( table_name => 'WLPINT.TOTO', streams_type => 'capture',
                                    streams_name => v_capture_name, queue_name  =>  v_owner||'.'||v_lcr_queue_name,
                                    include_dml  => true, include_ddl => false, dml_rule_name=> v_dml_rule_name, ddl_rule_name=> v_ddl_rule_name,
                                    include_tagged_lcr => true, inclusion_rule => true);
  -- create apply process that will read and empty LCR QUEUE and enqueue and call a procedure

  dbms_streams_adm.ADD_TABLE_RULES( table_name   => 'WLPINT.TOTO', streams_type  => 'apply',
                                    streams_name => v_apply_name, queue_name  => v_owner||'.'||v_lcr_queue_name,
                                    include_dml  => true, include_ddl => false, include_tagged_lcr => true,
                                    inclusion_rule => true, dml_rule_name  => v_dml_rule_name, ddl_rule_name => v_ddl_rule_name);

  dbms_output.put_line('rule_name:='||v_dml_rule_name);
  -- dbms_streams_adm.set_rule_transform_function(rule_name => v_dml_rule_name, transform_function  =>v_owner|| '.cv_toto_to_str');

  dbms_apply_adm.set_dml_handler( object_name => 'WLPINT.TOTO', object_type => 'TABLE', operation_name => 'INSERT',
                                   error_handler => FALSE, user_procedure  => v_owner|| '.cv_toto_to_str' , apply_database_link => NULL,
                                   apply_name => v_apply_name, assemble_lobs  => FALSE);
  -- start queues
  dbms_capture_adm.start_capture(v_capture_name) ;
  dbms_apply_adm.start_apply(v_apply_name) ;
 
end;
/
CREATE OR REPLACE procedure mquser.cv_toto_to_str(in_any IN ANYDATA)
IS
  lcr     SYS.LCR$_ROW_RECORD;
  lcr_out    SYS.LCR$_ROW_RECORD;
  col1       SYS.LCR$_ROW_UNIT;
  newvals    SYS.LCR$_ROW_LIST;
  oldvals    SYS.LCR$_ROW_LIST;
  OldCols       VARCHAR2(2000):='';
  a        ANYDATA;
  rc  NUMBER;
  t  PLS_INTEGER;
  var  number:=-1;
  v_varchar2_512 varchar2(512):='hello world';
  f2 varchar2(50);

    enqueue_options      dbms_aq.enqueue_options_t;
    message_properties   dbms_aq.message_properties_t;
    msgid                RAW(16);
    payload              sys.mgw_basic_msg_t;
    header               sys.mgw_name_value_array_t;
    text_body            sys.mgw_text_value_t;
--    recipients dbms_aq.aq$_recipient_list_t;


BEGIN
  IF in_any.GETTYPENAME='SYS.LCR$_ROW_RECORD' THEN
     rc := in_any.GETOBJECT(lcr);
     oldvals:=lcr.get_values('NEW');
     FOR i IN 1..oldvals.Count LOOP
             OldCols:= OldCols||oldvals(i).column_name||',';
     END LOOP;
     begin
       a:=lcr.get_value('NEW','TOTO_N');
       t := a.GETNUMBER(var) ;
       a:=lcr.get_value('NEW','TOTO_C');
       t := a.GETVARCHAR2(f2) ;
     exception
         when others then
             var:=-1;
     end ;
     v_varchar2_512:=to_char(sysdate)|| ' dwlpdb02 '||' Oracle -  MSG ' || to_char(var) || 'Hello world : ' || lcr.get_object_name()|| ' f2='||f2||' cols='||OldCols;
     col1:= SYS.LCR$_ROW_UNIT( 'MSG', SYS.AnyData.ConvertVarChar2(v_varchar2_512), DBMS_LCR.NOT_A_LOB, NULL, NULL);
     newvals := SYS.LCR$_ROW_LIST(col1);
     lcr_out := SYS.LCR$_ROW_RECORD.CONSTRUCT(
                source_database_name  =>  lcr.GET_SOURCE_DATABASE_NAME(),
                command_type          =>  'INSERT',
                object_owner          =>  'WLPINT',
                object_name           =>  'MQ_Q_LOG',
                old_values            =>  null,
                new_values            =>  newvals);
     lcr_out.execute(false);
     commit ;

         ----- MSG MQ now
  --   header := sys.mgw_name_value_array_t(
  --   sys.mgw_name_value_t.construct_integer('MGW_MQ_characterSet', '1208'),
  --   sys.mgw_name_value_t.construct_integer('MGW_MQ_priority', '7')
  --   );
       header := sys.mgw_name_value_array_t(
                  sys.mgw_name_value_t.construct_integer('MGW_MQ_priority ', '0')
              );

--     recipients(1) := sys.aq$_agent ('mquser',null,null);
--     message_properties.recipient_list := recipients;
     message_properties.priority:=10;

     -- Create the text body.
     text_body := sys.mgw_text_value_t(v_varchar2_512, NULL);

     -- Construct the message from its constituent parts.
     payload := sys.mgw_basic_msg_t(header, text_body, NULL);

     -- Enqueue the message.
     dbms_aq.enqueue(queue_name         => 'mquser.mq_q_toto',
                  enqueue_options    => enqueue_options,
                  message_properties => message_properties,
                  payload            => payload,
                  msgid              => msgid);
  commit;


  END IF;
END;
/

@mq_adm_cleanup.sql
@mq_adm_setup.sql