Thursday, March 12, 2009

Uni directional streams set up..

Oracle streams allows for back and forth repliaction between two active database servers.

Here I'll discuss the basic steps involved in setting up a streams replication in oracle 10g.

In the example provided I am setting up a upstream replication for table customer residing on binfo schema at source_db to binfo.customer at dest_db.

Init parameters
--------------------

certain initilalization parameters are required to be set at both source and destination instances before setting up streams.

Here are the parameters.

compatible - This parameter should be same on both sides.
Global_names = True --> Database links should match the global names.
Job_queue_process --> This parameter should be set to atleast 2(two).
Parallel_max_servers --> this parameter should be set to atleast 6 (six).
SGA_TARGET --> should be set .. else you need to set "streams_pool_size"
domain_name --> make sure this value is set.

Tns Names
-----------


Add Tnsnames entries for the source and destination databases on both sides.

Source_db = (DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = xx.xx.xx.xx)(PORT = 1521)) ) (CONNECT_DATA = (service_name = source_db) ) )
dest_db = (DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = xx.xx.xx.xx)(PORT = 1521)) ) (CONNECT_DATA = (service_name = dest_db) ) )

Create Tablespace
-----------------

Create separate tablespace for streams admin user objects on both sides.

create tablespace streams_ts datafile '/u01/oradata/sorce_db/streams_ts_001.dbf' size 250M;

and

create tablespace streams_ts datafile '/u01/oradata/dest_db/streams_ts_001.dbf' size 250M;

Create streams administrator user on both sides
------------------------------------------------

$> sqlplus / as sysdba

create user "STRMADMIN" identified by "xxxxxxx"
default tablespace streams_ts
temporary tablespace temp
quota unlimited on streams_ts;


Grant the following privs to strmsadmin user at the dest_db by looging in as sysdba

-----------------------------------------------------------------------------------
$> sqlplus / as sysdba
grant CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE to "STRMADMIN"; GRANT DBA TO STRMADMIN; GRANT EXECUTE ON sys.dbms_aq TO STRMADMIN; GRANT EXECUTE ON sys.dbms_aqadm TO STRMADMIN;


BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE
);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE
);
END;
/

BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE
);
END;
/

BEGIN
DBMS_AQADM.GRANT_TYPE_ACCESS
(
user_name => 'STRMADMIN'
);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE
);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE
);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE
(
privilege => DBMS_RULE_ADM.EXECUTE_ON_EVALUATION_CONTEXT, object_name => 'SYS.STREAMS$_EVALUATION_CONTEXT',
grantee => 'STRMADMIN',
grant_option => FALSE
);
END;
/


GRANT EXECUTE ON sys.dbms_capture_adm TO STRMADMIN; GRANT EXECUTE ON sys.dbms_apply_adm TO STRMADMIN; GRANT EXECUTE ON sys.dbms_rule_adm TO STRMADMIN;GRANT SELECT_CATALOG_ROLE TO STRMADMIN;


BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
(
privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT, grantee => 'STRMADMIN',
grant_option => TRUE
);
END;
/

GRANT EXECUTE ON SYS.dbms_streams_adm TO STRMADMIN; GRANT ALL PRIVILEGES TO STRMADMIN;


Create application user(schema) on both sides

------------------------------------------------------


create user binfo identified by binfo
default tablespace binfo_data
temporary tablespace temp
quota unlimited on binfo_data;
grant connect, dba to binfo_data;


Connect to Binfo at source_db

---------------------------------------
create table customer
( cust_id number(10) not null,
cust_Fn varchar2(20) not null,
cust_Ln Varchar2(20) not null,
Addr Varchar2 (50),
state varchar2(2));


alter table customer add constraint customer_pk primary key (cust_id);


Connect to strmadmin at the remote_db

--------------------------------------------------
$> sqlplus strmadmin/@remote_db
Create streams queue
BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_user => 'STRMADMIN');
END;
/

Add apply rules for tables at the destination database
-------------------------------------------------------------

BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES
( table_name => '"BINFO"."CUSTOMER"',
streams_type => 'APPLY',
streams_name => 'STRMADMIN_SOURCE_DB_',
queue_name => '"STRMADMIN"."STREAMS_QUEUE"',
include_dml => true,
include_ddl => true,
source_database => 'SOURCE_DB.XXX.COM');
END;
/


Connect to sysdba at the source database

-------------------------------------------------------
Enable supplemental logging at table level (optional) Supplemental logging helps capture additional information to redologs for row identification

alter table binfo.customer add supplemental log group cust_loggrp (cust_id, cust_fn, cust_ln);
Switch log file

sqlplus> ALTER SYSTEM SWITCH LOGFILE;


Grant the required privileges to strmadmin user

--------------------------------------------------------------


grant CONNECT, RESOURCE, AQ_ADMINISTRATOR_ROLE to "STRMADMIN";

GRANT DBA TO STRMADMIN;GRANT EXECUTE ON sys.dbms_aq TO STRMADMIN;

GRANT EXECUTE ON sys.dbms_aqadm TO STRMADMIN;


BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE
( privilege => 'ENQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE
( privilege => 'DEQUEUE_ANY',
grantee => 'STRMADMIN',
admin_option => FALSE);
END;
/
BEGIN
DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE
( privilege => 'MANAGE_ANY',
grantee => 'STRMADMIN',
admin_option => TRUE);
END;
/
BEGIN
DBMS_AQADM.GRANT_TYPE_ACCESS
( user_name => 'STRMADMIN');
END;
/
BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE)
;
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);


DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.CREATE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);


DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.ALTER_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);


DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE_SET,
grantee => 'STRMADMIN',
grant_option => TRUE);


DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.CREATE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);


DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.ALTER_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);


DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.EXECUTE_ANY_RULE,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/

BEGIN
DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE
( privilege => DBMS_RULE_ADM.EXECUTE_ON_EVALUATION_CONTEXT,
object_name => 'SYS.STREAMS$_EVALUATION_CONTEXT',
grantee => 'STRMADMIN',
grant_option => FALSE );
END;
/


GRANT EXECUTE ON sys.dbms_capture_adm TO STRMADMIN;GRANT EXECUTE ON sys.dbms_apply_adm TO STRMADMIN;GRANT EXECUTE ON sys.dbms_rule_adm TO STRMADMIN;GRANT SELECT_CATALOG_ROLE TO STRMADMIN;

BEGIN
DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE
( privilege => DBMS_RULE_ADM.EXECUTE_ANY_EVALUATION_CONTEXT,
grantee => 'STRMADMIN',
grant_option => TRUE);
END;
/


GRANT EXECUTE ON SYS.dbms_streams_adm TO STRMADMIN;

GRANT ALL PRIVILEGES TO STRMADMIN;

Connect to strmadmin at source_db

----------------------------------------------


Create a database link

CREATE DATABASE LINK
"dest_db.xxx.COM" connect to "STRMADMIN" identified by "STRMADMIN" using '(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=xxx.xxx.xxx.xx)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=dest_db.xx.com)))';

Create streams queue

BEGIN
DBMS_STREAMS_ADM.SET_UP_QUEUE
( queue_user => 'STRMADMIN');
END;
/


Add capture rules for tables at the source site

-------------------------------------------------------


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_RULES
( table_name => '"BINFO"."CUSTOMER"',
streams_type => 'CAPTURE',
streams_name => 'STRMADMIN_CAPTURE',
queue_name => '"STRMADMIN"."STREAMS_QUEUE"',
include_dml => true,
include_ddl => true,
source_database => 'SOURCE_DB.XX.COM');
END;
/


Add propagation rules for tables at the source database


BEGIN
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES
( table_name => '"BINFO"."CUSTOMER"',
streams_name => 'STRMADMIN_PROPAGATE',
source_queue_name => '"STRMADMIN"."STREAMS_QUEUE"',
destination_queue_name => '"STRMADMIN"."STREAMS_QUEUE"@SOURCE_DBxx.COM',
include_dml => true,
include_ddl => true,
source_database => 'source_db.xx.COM');
END;
/


Export the table from source_db and import them in to dest_db.

--------------------------------------------------------------------------------


Here you will instantiate the scn at dest_db along with import. You can use data pump as well.


exp USERID="STRMADMIN"@source_db TABLES="binfo"."customer" FILE=tables.dmp GRANTS=Y ROWS=N LOG=exportTables.log OBJECT_CONSISTENT=Y INDEXES=Y


imp USERID="STRMADMIN"@dest_db FULL=Y CONSTRAINTS=Y FILE=tables.dmp IGNORE=Y GRANTS=Y ROWS=N COMMIT=Y LOG=importTables.log STREAMS_CONFIGURATION=N STREAMS_INSTANTIATION=Y

Startup operations

----------------------------
You are pretty much done with the set up at this point .you need to start the capture and apply process.
connect to strmadmin@dest_db and start the apply process first

DECLARE v_started number;
BEGIN
SELECT decode
(status, 'ENABLED', 1, 0) INTO v_started FROM DBA_APPLY WHERE APPLY_NAME = 'STRMADMIN_source_db_'; if (v_started = 0) then
DBMS_APPLY_ADM.START_APPLY(apply_name => 'STRMADMIN_source_db_');
end if;
END;
/


Connect to strmadmin at the source database and start the capture process

-------------------------------------------------------------------------------------------


DECLARE
v_started number;
BEGIN
SELECT decode(status, 'ENABLED', 1, 0) INTO v_started FROM DBA_CAPTURE WHERE CAPTURE_NAME = 'STRMADMIN_CAPTURE';
if (v_started = 0) then
DBMS_CAPTURE_ADM.START_CAPTURE(capture_name => 'STRMADMIN_CAPTURE');
end if;
END;
/

Now test the replication.

-------------------------------


Insert some values in to the source table and see if its getting replicated.

I'll discuss more about streams troubleshooting in Future posts.

No comments: