CREATE SCHEMA cdc AUTHORIZATION postgres; GRANT ALL ON SCHEMA cdc TO postgres; GRANT USAGE ON SCHEMA cdc TO public; SET search_path TO cdc; CREATE TABLE cdc_config ( min_retain_interval interval hour to minute NOT NULL ) WITH ( OIDS=TRUE ); ALTER TABLE cdc_config OWNER TO postgres; GRANT ALL ON TABLE cdc_config TO postgres; GRANT SELECT ON TABLE cdc_config TO public; INSERT INTO cdc_config(min_retain_interval) VALUES('24:00'); CREATE TABLE cdc_tables ( cdc_source_schema text NOT NULL, cdc_source_table text NOT NULL, cdc_change_schema text NOT NULL, cdc_change_view text NOT NULL, CONSTRAINT cdc_tables_pk PRIMARY KEY (cdc_source_schema, cdc_source_table) ) WITH ( OIDS=TRUE ); ALTER TABLE cdc_tables OWNER TO postgres; GRANT ALL ON TABLE cdc_tables TO postgres; GRANT SELECT ON TABLE cdc_tables TO public; CREATE TABLE cdc_queue ( cdc_source_schema text NOT NULL, cdc_source_table text NOT NULL, cdc_timestamp timestamp with time zone NOT NULL, cdc_xid bigint NOT NULL, CONSTRAINT cdc_queue_pk PRIMARY KEY (cdc_source_schema, cdc_source_table, cdc_timestamp) ) WITH ( OIDS=TRUE ); ALTER TABLE cdc_queue OWNER TO postgres; GRANT ALL ON TABLE cdc_queue TO postgres; GRANT SELECT ON TABLE cdc_queue TO public; CREATE OR REPLACE FUNCTION run_cdc() RETURNS void AS $$ DECLARE v_schema TEXT; v_table TEXT; v_sql TEXT; v_xid BIGINT; v_retain INTERVAL; BEGIN LOCK TABLE cdc_queue IN EXCLUSIVE MODE NOWAIT; SELECT min_retain_interval INTO v_retain FROM cdc_config; DELETE FROM cdc_queue WHERE (cdc_source_schema, cdc_source_table) NOT IN (SELECT cdc_source_schema, cdc_source_table FROM cdc_tables); FOR v_schema, v_table IN select cdc_source_schema, cdc_source_table from cdc_tables order by cdc_source_schema, cdc_source_table LOOP DELETE FROM cdc_queue WHERE (cdc_source_schema = v_schema AND cdc_source_table = v_table AND cdc_timestamp < current_timestamp - v_retain); v_sql := 'SELECT max(xmin::text::bigint) FROM '||v_schema||'.'||v_table; EXECUTE v_sql INTO v_xid; INSERT INTO cdc_queue VALUES(v_schema, v_table, current_timestamp, v_xid); END LOOP; EXCEPTION WHEN lock_not_available THEN RETURN; END; $$ LANGUAGE plpgsql; ALTER FUNCTION run_cdc() OWNER TO postgres; GRANT EXECUTE ON FUNCTION run_cdc() TO public; CREATE OR REPLACE FUNCTION cdc_upperbound(v_schema TEXT, v_table TEXT) RETURNS int AS $$ DECLARE v_upperbound INT; BEGIN SELECT cdc_xid INTO v_upperbound FROM cdc_queue WHERE cdc_source_schema = v_schema AND cdc_source_table = v_table ORDER BY cdc_timestamp DESC LIMIT 1; RETURN v_upperbound; END; $$ LANGUAGE plpgsql IMMUTABLE; ALTER FUNCTION cdc_upperbound(text, text) OWNER TO postgres; GRANT EXECUTE ON FUNCTION cdc_upperbound(text, text) TO public; CREATE OR REPLACE FUNCTION cdc_lowerbound(v_schema TEXT, v_table TEXT) RETURNS int AS $$ DECLARE v_lowerbound INT; BEGIN SELECT cdc_xid INTO v_lowerbound FROM cdc_queue WHERE cdc_source_schema = v_schema AND cdc_source_table = v_table ORDER BY cdc_timestamp ASC LIMIT 1; RETURN v_lowerbound; END; $$ LANGUAGE plpgsql IMMUTABLE; ALTER FUNCTION cdc_lowerbound(text, text) OWNER TO postgres; GRANT EXECUTE ON FUNCTION cdc_lowerbound(text, text) TO public; CREATE OR REPLACE FUNCTION cdc_manageviews() RETURNS void AS $$ DECLARE v_source_schema TEXT; v_source_table TEXT; v_sql TEXT; v_change_schema TEXT; v_change_view TEXT; BEGIN FOR v_source_schema, v_source_table, v_change_schema, v_change_view IN select cdc_source_schema, cdc_source_table, cdc_change_schema, cdc_change_view from cdc_tables LOOP v_sql := 'CREATE OR REPLACE VIEW '||v_change_schema||'.'||v_change_view||' AS SELECT * FROM '||v_source_schema||'.'||v_source_table; v_sql := v_sql||' WHERE xmin::text::bigint <= cdc_upperbound('||quote_literal(v_source_schema)||','||quote_literal(v_source_table)||') AND'; v_sql := v_sql||' xmin::text::bigint >= cdc_lowerbound('||quote_literal(v_source_schema)||','||quote_literal(v_source_table)||') AND'; v_sql := v_sql||' cdc_lowerbound('||quote_literal(v_source_schema)||','||quote_literal(v_source_table)||') <> '; v_sql := v_sql||' cdc_upperbound('||quote_literal(v_source_schema)||','||quote_literal(v_source_table)||')'; EXECUTE v_sql; v_sql := 'ALTER TABLE '||v_change_schema||'.'||v_change_view||' OWNER TO postgres'; EXECUTE v_sql; v_sql := 'GRANT SELECT ON TABLE '||v_change_schema||'.'||v_change_view||' TO public'; EXECUTE v_sql; END LOOP; END; $$ LANGUAGE plpgsql; ALTER FUNCTION cdc_manageviews() OWNER TO postgres; GRANT EXECUTE ON FUNCTION cdc_manageviews() TO public;