------create dev schema and api user----------------------------------------------------------------------------------------------------------------- DROP SCHEMA IF EXISTS tps CASCADE; DROP SCHEMA IF EXISTS tpsv CASCADE; CREATE SCHEMA tps; COMMENT ON SCHEMA tps IS 'third party source data'; CREATE SCHEMA tpsv; COMMENT ON SCHEMA tps IS 'third party source views'; DROP USER IF EXISTS api; CREATE ROLE api WITH LOGIN NOSUPERUSER NOCREATEDB NOCREATEROLE INHERIT NOREPLICATION CONNECTION LIMIT -1 ENCRYPTED PASSWORD 'md56da13b696f737097e0146e47cc0d0985'; -----need to setup all database objects and then grant priveledges to api---------------------------------------------------------------------------- --grant schema USAGE GRANT USAGE ON SCHEMA tps TO api; GRANT USAGE ON SCHEMA tpsv TO api; --grant current table privledges GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tps TO api; GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tpsv TO api; --grant current sequence privledges GRANT USAGE ON ALL SEQUENCES IN SCHEMA tps TO api; GRANT USAGE ON ALL SEQUENCES IN SCHEMA tpsv TO api; --grant future table privledges ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api; ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api; --grant future sequence privledges ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT USAGE ON SEQUENCES TO api; ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT USAGE ON SEQUENCES TO api; -----create tables----------------------------------------------------------------------------------------------------------------------------------- -----regex map instructions table CREATE TABLE tps.map_rm ( srce text NOT NULL, target text NOT NULL, regex jsonb, seq integer NOT NULL, hist jsonb ); COMMENT ON TABLE tps.map_rm IS 'regex map instructions'; -----return value table CREATE TABLE tps.map_rv ( srce text NOT NULL, target text NOT NULL, retval jsonb NOT NULL, map jsonb NOT NULL, hist jsonb NOT NULL ); COMMENT ON TABLE tps.map_rv IS 'return value lookup table'; -----source definition table CREATE TABLE tps.srce ( srce text NOT NULL, defn jsonb, hist jsonb ); COMMENT ON TABLE tps.srce IS 'source master listing and definition'; -----source data table CREATE TABLE tps.trans ( id integer NOT NULL, srce text, rec jsonb, parse jsonb, map jsonb, allj jsonb, ic jsonb, logid INTEGER ); COMMENT ON TABLE tps.trans IS 'source records'; COMMENT ON COLUMN tps.trans.ic IS 'input constraint value'; ALTER TABLE tps.trans ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY ( SEQUENCE NAME tps.trans_id_seq START WITH 1 INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1 ); -----import log table CREATE TABLE tps.trans_log ( id integer NOT NULL, info jsonb ); COMMENT ON TABLE tps.trans_log IS 'import event information'; ALTER TABLE tps.trans_log ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY ( SEQUENCE NAME tps.trans_log_id_seq START WITH 1 INCREMENT BY 1 NO MINVALUE NO MAXVALUE CACHE 1 ); -------------primary keys---------------------------------------------------------------------------------------------------------------------------- ALTER TABLE ONLY tps.map_rm ADD CONSTRAINT map_rm_pk PRIMARY KEY (srce, target); ALTER TABLE ONLY tps.map_rv ADD CONSTRAINT map_rv_pk PRIMARY KEY (srce, target, retval); ALTER TABLE ONLY tps.srce ADD CONSTRAINT srce_pkey PRIMARY KEY (srce); ALTER TABLE ONLY tps.trans_log ADD CONSTRAINT trans_log_pkey PRIMARY KEY (id); ALTER TABLE ONLY tps.trans ADD CONSTRAINT trans_pkey PRIMARY KEY (id); -------------indexes--------------------------------------------------------------------------------------------------------------------------------- CREATE INDEX trans_allj ON tps.trans USING gin (allj); CREATE INDEX trans_rec ON tps.trans USING gin (rec); CREATE INDEX trans_srce ON tps.trans USING btree (srce); -------------foreign keys---------------------------------------------------------------------------------------------------------------------------- ALTER TABLE ONLY tps.map_rm ADD CONSTRAINT map_rm_fk_srce FOREIGN KEY (srce) REFERENCES tps.srce(srce); ALTER TABLE ONLY tps.map_rv ADD CONSTRAINT map_rv_fk_rm FOREIGN KEY (srce, target) REFERENCES tps.map_rm(srce, target); ALTER TABLE ONLY tps.trans ADD CONSTRAINT trans_srce_fkey FOREIGN KEY (srce) REFERENCES tps.srce(srce); ALTER TABLE ONLY tps.trans ADD CONSTRAINT trans_logid_fkey FOREIGN KEY (logid) REFERENCES tps.trans_log(id); -------------create functions------------------------------------------------------------------------------------------------------------------------ -----set source DROP FUNCTION IF EXISTS tps.srce_set(jsonb); CREATE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb AS $f$ DECLARE _message jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; _rebuild BOOLEAN; BEGIN ---------test if anythign is changing-------------------------------------------------------------------------------------------- IF _defn = (SELECT defn FROM tps.srce WHERE srce = _defn->>'name') THEN _message:= ( $$ { "status":"complete", "message":"source was not different no action taken" } $$::jsonb ); RETURN _message; END IF; ---------if the constraint definition is changing, rebuild for existing records--------------------------------------------------- SELECT NOT (_defn->'constraint' = (SELECT defn->'constraint' FROM tps.srce WHERE srce = _defn->>'name')) INTO _rebuild; RAISE NOTICE '%',_rebuild::text; ---------do merge----------------------------------------------------------------------------------------------------------------- INSERT INTO tps.srce (srce, defn, hist) SELECT --extract name from defintion _defn->>'name' --add current timestamp to defintions ,_defn --add definition ,jsonb_build_object( 'hist_defn',_defn ,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz) ) || '[]'::jsonb ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE SET defn = _defn ,hist = --the new definition going to position -0- jsonb_build_object( 'hist_defn',_defn ,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz) ) --the previous definition, set upper bound of effective range which was previously null || jsonb_set( srce.hist ,'{0,effective,1}'::text[] ,to_jsonb(CURRENT_TIMESTAMP) ); --rebuild constraint key if necessary--------------------------------------------------------------------------------------- IF _rebuild THEN WITH rebuild AS ( SELECT j.srce ,j.rec ,j.id --aggregate back to the record since multiple paths may be listed in the constraint ,tps.jsonb_concat_obj( jsonb_build_object( --the new json key is the path itself cons.path->>0 ,j.rec#>((cons.path->>0)::text[]) ) ) json_key FROM tps.trans j INNER JOIN tps.srce s ON s.srce = j.srce JOIN LATERAL jsonb_array_elements(s.defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE WHERE s.srce = _defn->>'name' GROUP BY j.rec ,j.id ) UPDATE tps.trans t SET ic = r.json_key FROM rebuild r WHERE t.id = r.id; _message:= ( $$ { "status":"complete", "message":"source set and constraint rebuilt on existing records" } $$::jsonb ); ELSE _message:= ( $$ { "status":"complete", "message":"source set" } $$::jsonb ); END IF; RETURN _message; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error importing data" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); RETURN _message; END; $f$ LANGUAGE plpgsql; -----generate sql to create select based on schema DROP FUNCTION IF EXISTS tps.build_srce_view_sql(text, text); CREATE OR REPLACE FUNCTION tps.build_srce_view_sql(_srce text, _schema text) RETURNS TEXT AS $f$ DECLARE --_schema text; --_srce text; _sql text; BEGIN --_schema:= 'default'; --_srce:= 'dcard'; SELECT 'DROP VIEW IF EXISTS tpsv.'||s.srce||'_'||(list.e->>'name')||'; CREATE VIEW tpsv.'||s.srce||'_'||(list.e->>'name')||' AS SELECT id, logid, allj, '||string_agg('(allj#>>'''||rec.PATH::text||''')::'||rec.type||' AS "'||rec.column_name||'"',', ')||' FROM tps.trans WHERE srce = '''||s.srce||''';' INTO _sql FROM tps.srce s JOIN LATERAL jsonb_array_elements(s.defn->'schemas') list (e) ON TRUE JOIN LATERAL jsonb_array_elements(list.e->'columns') as cols(e) ON TRUE JOIN LATERAL jsonb_to_record (cols.e) AS rec( PATH text[], "type" text, column_name text) ON TRUE WHERE srce = _srce AND list.e->>'name' = _schema GROUP BY s.srce ,list.e; RETURN _sql; RAISE NOTICE '%',_sql; END $f$ LANGUAGE plpgsql; -----set map defintion from json argument CREATE OR REPLACE FUNCTION tps.srce_map_def_set(_defn jsonb) RETURNS jsonb AS $f$ DECLARE _message jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; BEGIN BEGIN INSERT INTO tps.map_rm (srce, target, regex, seq, hist) SELECT --data source ae.r->>'srce' --map name ,ae.r->>'name' --map definition ,ae.r --map aggregation sequence ,(ae.r->>'sequence')::INTEGER --history definition ,jsonb_build_object( 'hist_defn',ae.r ,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz) ) || '[]'::jsonb FROM jsonb_array_elements(_defn) ae(r) ON CONFLICT ON CONSTRAINT map_rm_pk DO UPDATE SET srce = excluded.srce ,target = excluded.target ,regex = excluded.regex ,seq = excluded.seq ,hist = --the new definition going to position -0- jsonb_build_object( 'hist_defn',excluded.regex ,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz) ) --the previous definition, set upper bound of effective range which was previously null || jsonb_set( map_rm.hist ,'{0,effective,1}'::text[] ,to_jsonb(CURRENT_TIMESTAMP) ); EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error setting definition" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); return _message; END; _message:= jsonb_build_object('status','complete','message','definition has been set'); return _message; END; $f$ language plpgsql; ------------build report for unmapped items--------------------------------------------------------------------------------------------------------------------------------------------- CREATE OR REPLACE FUNCTION tps.jsonb_concat( state jsonb, concat jsonb) RETURNS jsonb AS $BODY$ BEGIN --RAISE notice 'state is %', state; --RAISE notice 'concat is %', concat; RETURN state || concat; END; $BODY$ LANGUAGE plpgsql VOLATILE COST 100; DROP AGGREGATE IF EXISTS tps.jsonb_concat_obj(jsonb); CREATE AGGREGATE tps.jsonb_concat_obj(jsonb) ( SFUNC=tps.jsonb_concat, STYPE=jsonb, INITCOND='{}' ); DROP FUNCTION IF EXISTS tps.report_unmapped; CREATE FUNCTION tps.report_unmapped(_srce text) RETURNS TABLE ( source text, map text, ret_val jsonb, "count" bigint ) LANGUAGE plpgsql AS $f$ BEGIN /* first get distinct target json values then apply regex */ RETURN QUERY WITH --------------------apply regex operations to transactions--------------------------------------------------------------------------------- rx AS ( SELECT t.srce, t.id, t.rec, m.target, m.seq, regex->'regex'->>'function' regex_function, e.v ->> 'field' result_key_name, e.v ->> 'key' target_json_path, e.v ->> 'flag' regex_options_flag, e.v->>'map' map_intention, e.v->>'retain' retain_result, e.v->>'regex' regex_expression, e.rn target_item_number, COALESCE(mt.rn,rp.rn,1) result_number, mt.mt rx_match, rp.rp rx_replace, CASE e.v->>'map' WHEN 'y' THEN e.v->>'field' ELSE null END map_key, CASE e.v->>'map' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(mt.mt[1]) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rp.rp) ELSE '{}'::jsonb END ELSE NULL END map_val, CASE e.v->>'retain' WHEN 'y' THEN e.v->>'field' ELSE NULL END retain_key, CASE e.v->>'retain' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(trim(mt.mt[1])) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rtrim(rp.rp)) ELSE '{}'::jsonb END ELSE NULL END retain_val FROM --------------------------start with all regex maps------------------------------------------------------------------------------------ tps.map_rm m --------------------------isolate matching basis to limit map to only look at certain json--------------------------------------------- LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE --------------------------join to main transaction table but only certain key/values are included-------------------------------------- INNER JOIN tps.trans t ON t.srce = m.srce AND t.rec @> w.v --------------------------break out array of regluar expressions in the map------------------------------------------------------------ LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true --------------------------each regex references a path to the target value, extract the target from the reference and do regex--------- LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON m.regex->'regex'->>'function' = 'extract' --------------------------same as above but for a replacement type function------------------------------------------------------------ LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON m.regex->'regex'->>'function' = 'replace' WHERE --t.allj IS NULL t.srce = _srce AND e.v @> '{"map":"y"}'::jsonb --rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}' --rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb ORDER BY t.id DESC, m.target, e.rn, COALESCE(mt.rn,rp.rn,1) ) --SELECT * FROM rx LIMIT 100 , agg_to_target_items AS ( SELECT srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,CASE WHEN map_key IS NULL THEN NULL ELSE jsonb_build_object( map_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(map_val ORDER BY result_number) -> 0 ELSE jsonb_agg(map_val ORDER BY result_number) END ) END map_val ,CASE WHEN retain_key IS NULL THEN NULL ELSE jsonb_build_object( retain_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(retain_val ORDER BY result_number) -> 0 ELSE jsonb_agg(retain_val ORDER BY result_number) END ) END retain_val FROM rx GROUP BY srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,map_key ,retain_key ) --SELECT * FROM agg_to_target_items LIMIT 100 , agg_to_target AS ( SELECT srce ,id ,target ,seq ,map_intention ,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val ,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val FROM agg_to_target_items GROUP BY srce ,id ,target ,seq ,map_intention ) , agg_to_ret AS ( SELECT srce ,target ,seq ,map_intention ,map_val ,retain_val ,count(*) "count" FROM agg_to_target GROUP BY srce ,target ,seq ,map_intention ,map_val ,retain_val ) , link_map AS ( SELECT a.srce ,a.target ,a.seq ,a.map_intention ,a.map_val ,a."count" ,a.retain_val ,v.map mapped_val FROM agg_to_ret a LEFT OUTER JOIN tps.map_rv v ON v.srce = a.srce AND v.target = a.target AND v.retval = a.map_val ) SELECT l.srce ,l.target ,l.map_val ,l."count" FROM link_map l WHERE l.mapped_val IS NULL ORDER BY l.srce ,l.target ,l."count" desc; END; $f$; -------------------create trigger to map imported items------------------------------------------------------------------------------------------------------ CREATE OR REPLACE FUNCTION tps.trans_insert_map() RETURNS TRIGGER AS $f$ DECLARE _cnt INTEGER; BEGIN IF (TG_OP = 'INSERT') THEN WITH --------------------apply regex operations to transactions----------------------------------------------------------------------------------- rx AS ( SELECT t.srce, t.id, t.rec, m.target, m.seq, regex->'regex'->>'function' regex_function, e.v ->> 'field' result_key_name, e.v ->> 'key' target_json_path, e.v ->> 'flag' regex_options_flag, e.v->>'map' map_intention, e.v->>'retain' retain_result, e.v->>'regex' regex_expression, e.rn target_item_number, COALESCE(mt.rn,rp.rn,1) result_number, mt.mt rx_match, rp.rp rx_replace, CASE e.v->>'map' WHEN 'y' THEN e.v->>'field' ELSE null END map_key, CASE e.v->>'map' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(mt.mt[1]) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rp.rp) ELSE '{}'::jsonb END ELSE NULL END map_val, CASE e.v->>'retain' WHEN 'y' THEN e.v->>'field' ELSE NULL END retain_key, CASE e.v->>'retain' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(trim(mt.mt[1])) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rtrim(rp.rp)) ELSE '{}'::jsonb END ELSE NULL END retain_val FROM --------------------------start with all regex maps------------------------------------------------------------------------------------ tps.map_rm m --------------------------isolate matching basis to limit map to only look at certain json--------------------------------------------- LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE --------------------------join to main transaction table but only certain key/values are included-------------------------------------- INNER JOIN new_table t ON t.srce = m.srce AND t.rec @> w.v --------------------------break out array of regluar expressions in the map------------------------------------------------------------ LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true --------------------------each regex references a path to the target value, extract the target from the reference and do regex--------- LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON m.regex->'regex'->>'function' = 'extract' --------------------------same as above but for a replacement type function------------------------------------------------------------ LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON m.regex->'regex'->>'function' = 'replace' ORDER BY t.id DESC, m.target, e.rn, COALESCE(mt.rn,rp.rn,1) ) --SELECT count(*) FROM rx LIMIT 100 , agg_to_target_items AS ( SELECT srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,CASE WHEN map_key IS NULL THEN NULL ELSE jsonb_build_object( map_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(map_val ORDER BY result_number) -> 0 ELSE jsonb_agg(map_val ORDER BY result_number) END ) END map_val ,CASE WHEN retain_key IS NULL THEN NULL ELSE jsonb_build_object( retain_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(retain_val ORDER BY result_number) -> 0 ELSE jsonb_agg(retain_val ORDER BY result_number) END ) END retain_val FROM rx GROUP BY srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,map_key ,retain_key ) --SELECT * FROM agg_to_target_items LIMIT 100 , agg_to_target AS ( SELECT srce ,id ,target ,seq ,map_intention ,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val ,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val FROM agg_to_target_items GROUP BY srce ,id ,target ,seq ,map_intention ORDER BY id ) --SELECT * FROM agg_to_target , link_map AS ( SELECT a.srce ,a.id ,a.target ,a.seq ,a.map_intention ,a.map_val ,a.retain_val retain_value ,v.map FROM agg_to_target a LEFT OUTER JOIN tps.map_rv v ON v.srce = a.srce AND v.target = a.target AND v.retval = a.map_val ) --SELECT * FROM link_map , agg_to_id AS ( SELECT srce ,id ,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val ,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map FROM link_map GROUP BY srce ,id ) --SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100 --create a complete list of all new inserts assuming some do not have maps (left join) ,join_all AS ( SELECT n.srce ,n.id ,n.rec ,a.retain_val parse ,a.map ,n.rec||COALESCE(a.map||a.retain_val,'{}'::jsonb) allj FROM new_table n LEFT OUTER JOIN agg_to_id a ON a.id = n.id ) --update trans with join_all recs UPDATE tps.trans t SET parse = a.parse ,map = a.map ,allj = a.allj FROM join_all a WHERE t.id = a.id; END IF; RETURN NULL; END; $f$ LANGUAGE plpgsql; CREATE TRIGGER trans_insert AFTER INSERT ON tps.trans REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE tps.trans_insert_map(); ------------------import data------------------------------------------------------------------------------------------------------------------------------------- DROP FUNCTION IF EXISTS tps.srce_import(text, jsonb); CREATE OR REPLACE FUNCTION tps.srce_import(_srce text, _recs jsonb) RETURNS jsonb /*-------------------------------------------------------- 0. test if source exists 1. create pending list 2. get unqiue pending keys 3. see which keys not already in tps.trans 4. insert pending records associated with keys that are not already in trans 5. insert summary to log table */--------------------------------------------------------- --to-do --return infomation to a client via json or composite type AS $f$ DECLARE _t text; _c text; _log_info jsonb; _log_id text; _cnt numeric; _message jsonb; --_recs jsonb; --_srce text; _defn jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; BEGIN --_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv'; --_srce := 'dcard'; --_recs:= $$[{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"GOOGLE *YOUTUBE VIDEOS G.CO/HELPPAY#CAP0H07TXV","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"MICROSOFT *ONEDRIVE 800-642-7676 WA","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/3/2018","Post Date":"1/3/2018","Description":"CLE CLINIC PT PMTS 216-445-6249 OHAK2C57F2F0B3","Amount":200,"Category":"Medical Services"},{"Trans. Date":"1/4/2018","Post Date":"1/4/2018","Description":"AT&T *PAYMENT 800-288-2020 TX","Amount":57.14,"Category":"Services"},{"Trans. Date":"1/4/2018","Post Date":"1/7/2018","Description":"WWW.KOHLS.COM #0873 MIDDLETOWN OH","Amount":-7.9,"Category":"Payments and Credits"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"PIZZA HUT 007946 STOW OH","Amount":9.24,"Category":"Restaurants"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"SUBWAY 00044289255 STOW OH","Amount":10.25,"Category":"Restaurants"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"ACME NO. 17 STOW OH","Amount":103.98,"Category":"Supermarkets"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":1.69,"Category":"Merchandise"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":2.19,"Category":"Merchandise"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00947R","Amount":3.94,"Category":"Gasoline"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00915R","Amount":52.99,"Category":"Gasoline"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"AUTOZONE #0722 STOW OH","Amount":85.36,"Category":"Automotive"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":26.68,"Category":"Merchandise"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"EL CAMPESINO STOW OH","Amount":6.5,"Category":"Restaurants"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"TARGET STOW OH","Amount":197.9,"Category":"Merchandise"},{"Trans. Date":"1/14/2018","Post Date":"1/14/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":13.48,"Category":"Merchandise"},{"Trans. Date":"1/15/2018","Post Date":"1/15/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":22.41,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"BUFFALO WILD WINGS KENT KENT OH","Amount":63.22,"Category":"Restaurants"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"PARTA - KCG KENT OH","Amount":4,"Category":"Government Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"REMEMBERNHU 402-935-7733 IA","Amount":60,"Category":"Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":44.81,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TREE CITY COFFEE & PASTR KENT OH","Amount":17.75,"Category":"Restaurants"},{"Trans. Date":"1/17/2018","Post Date":"1/17/2018","Description":"BESTBUYCOM805526794885 888-BESTBUY MN","Amount":343.72,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":5.98,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"U-HAUL OF KENT-STOW KENT OH","Amount":15.88,"Category":"Travel/ Entertainment"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":5.99,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":17.16,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":500.97,"Category":"Supermarkets"},{"Trans. Date":"1/20/2018","Post Date":"1/20/2018","Description":"GOOGLE *GOOGLE PLAY G.CO/HELPPAY#CAP0HFFS7W","Amount":2.12,"Category":"Services"},{"Trans. Date":"1/20/2018","Post Date":"1/20/2018","Description":"LOWE'S OF STOW, OH. STOW OH","Amount":256.48,"Category":"Home Improvement"},{"Trans. Date":"1/23/2018","Post Date":"1/23/2018","Description":"CASHBACK BONUS REDEMPTION PYMT/STMT CRDT","Amount":-32.2,"Category":"Awards and Rebate Credits"},{"Trans. Date":"1/23/2018","Post Date":"1/23/2018","Description":"INTERNET PAYMENT - THANK YOU","Amount":-2394.51,"Category":"Payments and Credits"},{"Trans. Date":"1/27/2018","Post Date":"1/27/2018","Description":"GIANT-EAGLE #4096 STOW OH","Amount":67.81,"Category":"Supermarkets"},{"Trans. Date":"1/27/2018","Post Date":"1/27/2018","Description":"OFFICEMAX/OFFICE DEPOT63 STOW OH","Amount":21.06,"Category":"Merchandise"},{"Trans. Date":"1/27/2018","Post Date":"1/27/2018","Description":"TARGET STOW OH","Amount":71,"Category":"Merchandise"},{"Trans. Date":"1/29/2018","Post Date":"1/29/2018","Description":"NETFLIX.COM NETFLIX.COM CA19899514437","Amount":14.93,"Category":"Services"},{"Trans. Date":"1/30/2018","Post Date":"1/30/2018","Description":"SQ *TWISTED MELTZ KENT OH0002305843011416898511","Amount":16.87,"Category":"Restaurants"},{"Trans. Date":"1/30/2018","Post Date":"1/30/2018","Description":"TARGET STOW OH","Amount":49.37,"Category":"Merchandise"}]$$::jsonb; ----------------------------------------------------test if source exists---------------------------------------------------------------------------------- SELECT defn INTO _defn FROM tps.srce WHERE srce = _srce; IF _defn IS NULL THEN _message:= format( $$ { "status":"fail", "message":"source %L does not exists" } $$, _srce )::jsonb; RETURN _message; END IF; -------------unwrap the json record and apply the path(s) of the constraint to build a constraint key per record----------------------------------------------------------------------------------- WITH pending_list AS ( SELECT _srce srce ,j.rec ,j.id --aggregate back to the record since multiple paths may be listed in the constraint --it is unclear why the "->>0" is required to correctly extract the text array from the jsonb ,tps.jsonb_concat_obj( jsonb_build_object( --the new json key is the path itself cons.path->>0 ,j.rec#>((cons.path->>0)::text[]) ) ) json_key FROM jsonb_array_elements(_recs) WITH ORDINALITY j(rec,id) JOIN LATERAL jsonb_array_elements(_defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE GROUP BY j.rec ,j.id ) -----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------ , pending_keys AS ( SELECT DISTINCT json_key FROM pending_list ) -----------list of keys already loaded to tps----------------------------------------------------------------------------------------------------- , matched_keys AS ( SELECT DISTINCT k.json_key FROM pending_keys k INNER JOIN tps.trans t ON t.ic = k.json_key ) -----------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- , unmatched_keys AS ( SELECT json_key FROM pending_keys EXCEPT SELECT json_key FROM matched_keys ) --------build log record-------------------+------------------------------------------------------------------------------------------------ , logged AS ( INSERT INTO tps.trans_log (info) SELECT JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP) ||JSONB_BUILD_OBJECT('srce',_srce) --||JSONB_BUILD_OBJECT('path',_path) ||JSONB_BUILD_OBJECT('not_inserted', ( SELECT jsonb_agg(json_key) FROM matched_keys ) ) ||JSONB_BUILD_OBJECT('inserted', ( SELECT jsonb_agg(json_key) FROM unmatched_keys ) ) RETURNING * ) -----------insert pending rows that have key with no trans match----------------------------------------------------------------------------------- --need to look into mapping the transactions prior to loading , inserted AS ( INSERT INTO tps.trans (srce, rec, ic, logid) SELECT pl.srce ,pl.rec ,pl.json_key ,logged.id FROM pending_list pl INNER JOIN unmatched_keys u ON u.json_key = pl.json_key CROSS JOIN logged ORDER BY pl.id ASC ----this conflict is only if an exact duplicate rec json happens, which will be rejected ----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified RETURNING * ) SELECT id ,info INTO _log_id ,_log_info FROM logged; --RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info; _message:= ( $$ { "status":"complete" } $$::jsonb )||jsonb_build_object('details',_log_info); RETURN _message; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error importing data" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); return _message; END; $f$ LANGUAGE plpgsql; ---------------overwrite maps-------------------------------------------------------------------------------------------------------------- CREATE OR REPLACE FUNCTION tps.srce_map_overwrite(_srce text) RETURNS jsonb AS $f$ DECLARE _message jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; BEGIN WITH --------------------apply regex operations to transactions----------------------------------------------------------------------------------- rx AS ( SELECT t.srce, t.id, t.rec, m.target, m.seq, regex->'regex'->>'function' regex_function, e.v ->> 'field' result_key_name, e.v ->> 'key' target_json_path, e.v ->> 'flag' regex_options_flag, e.v->>'map' map_intention, e.v->>'retain' retain_result, e.v->>'regex' regex_expression, e.rn target_item_number, COALESCE(mt.rn,rp.rn,1) result_number, mt.mt rx_match, rp.rp rx_replace, CASE e.v->>'map' WHEN 'y' THEN e.v->>'field' ELSE null END map_key, CASE e.v->>'map' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(mt.mt[1]) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rp.rp) ELSE '{}'::jsonb END ELSE NULL END map_val, CASE e.v->>'retain' WHEN 'y' THEN e.v->>'field' ELSE NULL END retain_key, CASE e.v->>'retain' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(trim(mt.mt[1])) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rtrim(rp.rp)) ELSE '{}'::jsonb END ELSE NULL END retain_val FROM --------------------------start with all regex maps------------------------------------------------------------------------------------ tps.map_rm m --------------------------isolate matching basis to limit map to only look at certain json--------------------------------------------- LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE --------------------------join to main transaction table but only certain key/values are included-------------------------------------- INNER JOIN tps.trans t ON t.srce = m.srce AND t.rec @> w.v --------------------------break out array of regluar expressions in the map------------------------------------------------------------ LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true --------------------------each regex references a path to the target value, extract the target from the reference and do regex--------- LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON m.regex->'regex'->>'function' = 'extract' --------------------------same as above but for a replacement type function------------------------------------------------------------ LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON m.regex->'regex'->>'function' = 'replace' WHERE --t.allj IS NULL t.srce = _srce --rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}' --rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb ORDER BY t.id DESC, m.target, e.rn, COALESCE(mt.rn,rp.rn,1) ) --SELECT count(*) FROM rx LIMIT 100 , agg_to_target_items AS ( SELECT srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,CASE WHEN map_key IS NULL THEN NULL ELSE jsonb_build_object( map_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(map_val ORDER BY result_number) -> 0 ELSE jsonb_agg(map_val ORDER BY result_number) END ) END map_val ,CASE WHEN retain_key IS NULL THEN NULL ELSE jsonb_build_object( retain_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(retain_val ORDER BY result_number) -> 0 ELSE jsonb_agg(retain_val ORDER BY result_number) END ) END retain_val FROM rx GROUP BY srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,map_key ,retain_key ) --SELECT * FROM agg_to_target_items LIMIT 100 , agg_to_target AS ( SELECT srce ,id ,target ,seq ,map_intention ,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val ,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val FROM agg_to_target_items GROUP BY srce ,id ,target ,seq ,map_intention ORDER BY id ) --SELECT * FROM agg_to_target , link_map AS ( SELECT a.srce ,a.id ,a.target ,a.seq ,a.map_intention ,a.map_val ,a.retain_val retain_value ,v.map FROM agg_to_target a LEFT OUTER JOIN tps.map_rv v ON v.srce = a.srce AND v.target = a.target AND v.retval = a.map_val ) --SELECT * FROM link_map , agg_to_id AS ( SELECT srce ,id ,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val ,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map FROM link_map GROUP BY srce ,id ) --SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100 UPDATE tps.trans t SET map = o.map, parse = o.retain_val, allj = t.rec||o.map||o.retain_val FROM agg_to_id o WHERE o.id = t.id; _message:= jsonb_build_object('status','complete'); RETURN _message; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error setting map value" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); RETURN _message; END; $f$ language plpgsql; ---------------------set map values from json array of json objects----------------------------------------------------- DROP FUNCTION IF EXISTS tps.map_rv_set; CREATE OR REPLACE FUNCTION tps.map_rv_set(_defn jsonb) RETURNS jsonb AS $f$ DECLARE _message jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; BEGIN INSERT INTO tps.map_rv (srce, target, retval, map, hist) SELECT r.source ,r.map ,r.ret_val ,r.mapped ,jsonb_build_object( 'hist_defn',mapped ,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz) ) || '[]'::jsonb FROM JSONB_ARRAY_ELEMENTS(_defn) WITH ORDINALITY ae(r,s) JOIN LATERAL jsonb_to_record(ae.r) r(source TEXT,map TEXT, ret_val jsonb, mapped jsonb) ON TRUE ON CONFLICT ON CONSTRAINT map_rv_pk DO UPDATE SET map = excluded.map ,hist = --the new definition going to position -0- jsonb_build_object( 'hist_defn',excluded.map ,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz) ) --the previous definition, set upper bound of effective range which was previously null || jsonb_set( map_rv.hist ,'{0,effective,1}'::text[] ,to_jsonb(CURRENT_TIMESTAMP) ); -------return message-------------------------------------------------------------------------------------------------- _message:= jsonb_build_object('status','complete'); RETURN _message; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error setting map value" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); RETURN _message; END; $f$ LANGUAGE plpgsql; ------------------------------test regex with only unique results-------------------------------------------------------------- DROP FUNCTION IF EXISTS tps.test_regex(jsonb); CREATE FUNCTION tps.test_regex(_defn jsonb) RETURNS jsonb LANGUAGE plpgsql AS $f$ DECLARE _rslt jsonb; BEGIN WITH --------------------apply regex operations to transactions--------------------------------------------------------------------------------- rx AS ( SELECT t.srce, t.id, t.rec, m.target, m.seq, regex->'regex'->>'function' regex_function, e.v ->> 'field' result_key_name, e.v ->> 'key' target_json_path, e.v ->> 'flag' regex_options_flag, e.v->>'map' map_intention, e.v->>'retain' retain_result, e.v->>'regex' regex_expression, e.rn target_item_number, COALESCE(mt.rn,rp.rn,1) result_number, mt.mt rx_match, rp.rp rx_replace, CASE e.v->>'map' WHEN 'y' THEN e.v->>'field' ELSE null END map_key, CASE e.v->>'map' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(mt.mt[1]) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rp.rp) ELSE '{}'::jsonb END ELSE NULL END map_val, CASE e.v->>'retain' WHEN 'y' THEN e.v->>'field' ELSE NULL END retain_key, CASE e.v->>'retain' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(trim(mt.mt[1])) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rtrim(rp.rp)) ELSE '{}'::jsonb END ELSE NULL END retain_val FROM --------------------------start with all regex maps------------------------------------------------------------------------------------ (SELECT _defn->>'srce' srce, _defn->>'name' target, _defn->'regex' regex, (_defn->>'sequence')::numeric seq) m --------------------------isolate matching basis to limit map to only look at certain json--------------------------------------------- LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE --------------------------break out array of regluar expressions in the map------------------------------------------------------------ LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true --------------------------join to main transaction table but only certain key/values are included-------------------------------------- INNER JOIN tps.trans t ON t.srce = m.srce AND t.rec @> w.v --------------------------each regex references a path to the target value, extract the target from the reference and do regex--------- LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON m.regex->'regex'->>'function' = 'extract' --------------------------same as above but for a replacement type function------------------------------------------------------------ LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON m.regex->'regex'->>'function' = 'replace' ORDER BY t.id DESC, m.target, e.rn, COALESCE(mt.rn,rp.rn,1) ) --SELECT * FROM rx LIMIT 100 , agg_to_target_items AS ( SELECT srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,CASE WHEN map_key IS NULL THEN NULL ELSE jsonb_build_object( map_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(map_val ORDER BY result_number) -> 0 ELSE jsonb_agg(map_val ORDER BY result_number) END ) END map_val ,CASE WHEN retain_key IS NULL THEN NULL ELSE jsonb_build_object( retain_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(retain_val ORDER BY result_number) -> 0 ELSE jsonb_agg(retain_val ORDER BY result_number) END ) END retain_val FROM rx GROUP BY srce ,id ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,map_key ,retain_key ) --SELECT * FROM agg_to_target_items LIMIT 100 , agg_to_target AS ( SELECT srce ,id ,target ,seq ,map_intention ,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val ,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val FROM agg_to_target_items GROUP BY srce ,id ,target ,seq ,map_intention ) , agg_to_ret AS ( SELECT srce ,target ,seq ,map_intention ,map_val ,retain_val ,count(*) "count" FROM agg_to_target GROUP BY srce ,target ,seq ,map_intention ,map_val ,retain_val ) ,agg_to_id AS ( SELECT l.srce ,l.target ,l.map_val ,l."count" FROM agg_to_ret l ORDER BY l.srce ,l.target ,l."count" desc ) SELECT jsonb_agg(row_to_json(agg_to_id)::jsonb) INTO _rslt FROM agg_to_id; RETURN _rslt; END; $f$; ------------------------------test regex with all original records-------------------------------------------------------------- DROP FUNCTION IF EXISTS tps.report_unmapped_recs; CREATE FUNCTION tps.report_unmapped_recs(_srce text) RETURNS TABLE ( source text, map text, ret_val jsonb, "count" bigint, recs jsonb ) LANGUAGE plpgsql AS $f$ BEGIN /* first get distinct target json values then apply regex */ RETURN QUERY WITH --------------------apply regex operations to transactions--------------------------------------------------------------------------------- rx AS ( SELECT t.srce, t.id, t.rec, m.target, m.seq, regex->'regex'->>'function' regex_function, e.v ->> 'field' result_key_name, e.v ->> 'key' target_json_path, e.v ->> 'flag' regex_options_flag, e.v->>'map' map_intention, e.v->>'retain' retain_result, e.v->>'regex' regex_expression, e.rn target_item_number, COALESCE(mt.rn,rp.rn,1) result_number, mt.mt rx_match, rp.rp rx_replace, CASE e.v->>'map' WHEN 'y' THEN e.v->>'field' ELSE null END map_key, CASE e.v->>'map' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(mt.mt[1]) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rp.rp) ELSE '{}'::jsonb END ELSE NULL END map_val, CASE e.v->>'retain' WHEN 'y' THEN e.v->>'field' ELSE NULL END retain_key, CASE e.v->>'retain' WHEN 'y' THEN CASE regex->'regex'->>'function' WHEN 'extract' THEN CASE WHEN array_upper(mt.mt,1)=1 THEN to_json(trim(mt.mt[1])) ELSE array_to_json(mt.mt) END::jsonb WHEN 'replace' THEN to_jsonb(rtrim(rp.rp)) ELSE '{}'::jsonb END ELSE NULL END retain_val FROM --------------------------start with all regex maps------------------------------------------------------------------------------------ tps.map_rm m --------------------------isolate matching basis to limit map to only look at certain json--------------------------------------------- LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE --------------------------join to main transaction table but only certain key/values are included-------------------------------------- INNER JOIN tps.trans t ON t.srce = m.srce AND t.rec @> w.v --------------------------break out array of regluar expressions in the map------------------------------------------------------------ LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true --------------------------each regex references a path to the target value, extract the target from the reference and do regex--------- LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON m.regex->'regex'->>'function' = 'extract' --------------------------same as above but for a replacement type function------------------------------------------------------------ LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON m.regex->'regex'->>'function' = 'replace' WHERE --t.allj IS NULL t.srce = _srce AND e.v @> '{"map":"y"}'::jsonb --rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}' --rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb ORDER BY t.id DESC, m.target, e.rn, COALESCE(mt.rn,rp.rn,1) ) --SELECT * FROM rx LIMIT 100 , agg_to_target_items AS ( SELECT srce ,id ,rec ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,CASE WHEN map_key IS NULL THEN NULL ELSE jsonb_build_object( map_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(map_val ORDER BY result_number) -> 0 ELSE jsonb_agg(map_val ORDER BY result_number) END ) END map_val ,CASE WHEN retain_key IS NULL THEN NULL ELSE jsonb_build_object( retain_key, CASE WHEN max(result_number) = 1 THEN jsonb_agg(retain_val ORDER BY result_number) -> 0 ELSE jsonb_agg(retain_val ORDER BY result_number) END ) END retain_val FROM rx GROUP BY srce ,id ,rec ,target ,seq ,map_intention ,regex_function ,target_item_number ,result_key_name ,target_json_path ,map_key ,retain_key ) --SELECT * FROM agg_to_target_items LIMIT 100 , agg_to_target AS ( SELECT srce ,id ,rec ,target ,seq ,map_intention ,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val ,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val FROM agg_to_target_items GROUP BY srce ,id ,rec ,target ,seq ,map_intention ) , agg_to_ret AS ( SELECT srce ,target ,seq ,map_intention ,map_val ,retain_val ,count(*) "count" ,jsonb_agg(rec) rec FROM agg_to_target GROUP BY srce ,target ,seq ,map_intention ,map_val ,retain_val ) , link_map AS ( SELECT a.srce ,a.target ,a.seq ,a.map_intention ,a.map_val ,a."count" ,a.rec ,a.retain_val ,v.map mapped_val FROM agg_to_ret a LEFT OUTER JOIN tps.map_rv v ON v.srce = a.srce AND v.target = a.target AND v.retval = a.map_val ) SELECT l.srce ,l.target ,l.map_val ,l."count" ,l.rec FROM link_map l WHERE l.mapped_val IS NULL ORDER BY l.srce ,l.target ,l."count" desc; END; $f$; --setup function to delete a single source DROP FUNCTION IF EXISTS tps.srce_delete(jsonb); CREATE FUNCTION tps.srce_delete(_defn jsonb) RETURNS jsonb AS $f$ DECLARE _message jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; _rebuild BOOLEAN; BEGIN -------------------------------do delete--------------------------------- DELETE FROM tps.srce WHERE srce = _defn->>'name'; --could move this record to a "recycle bin" table for a certain period of time --need to handle cascading record deletes ---------------------------set message----------------------------------- _message:= ( $$ { "status":"complete", "message":"source was permanently deleted" } $$::jsonb ); RETURN _message; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error dropping the source" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); RETURN _message; END; $f$ LANGUAGE plpgsql; /* This function takes and array of definition object where "name" object is the primary key It will force the entire body of sources to match what is received */ DROP FUNCTION IF EXISTS tps.srce_overwrite_all(jsonb); CREATE FUNCTION tps.srce_overwrite_all(_defn jsonb) RETURNS jsonb AS $f$ DECLARE _message jsonb; _MESSAGE_TEXT text; _PG_EXCEPTION_DETAIL text; _PG_EXCEPTION_HINT text; _rebuild BOOLEAN; BEGIN WITH --retain the results of the update by srce _set AS ( SELECT j.rn rn ,j.e->>'name' srce ,j.e defn FROM jsonb_array_elements(_defn) WITH ORDINALITY j(e, rn) ) --full join ,_full AS ( SELECT COALESCE(_srce.srce,_set.srce) srce ,CASE COALESCE(_set.srce,'DELETE') WHEN 'DELETE' THEN 'DELETE' ELSE 'SET' END actn ,COALESCE(_set.defn,_srce.defn) defn FROM tps.srce _srce FULL OUTER JOIN _set ON _set.srce = _srce.srce ) --call functions from list ,_do AS ( SELECT f.srce ,f.actn ,COALESCE(setd.message, deld.message) message FROM _full f LEFT JOIN LATERAL tps.srce_set(defn) setd(message) ON f.actn = 'SET' LEFT JOIN LATERAL tps.srce_delete(defn) deld(message) ON f.actn = 'DELETE' ) --aggregate all the messages into one message ---- ---- should look at rolling back the whole thing if one of the function returns a fail. stored proc could do this. ---- SELECT jsonb_agg(jsonb_build_object('source',srce,'status',message->>'status','message',message->>'message')) INTO _message FROM _do; RETURN _message; EXCEPTION WHEN OTHERS THEN GET STACKED DIAGNOSTICS _MESSAGE_TEXT = MESSAGE_TEXT, _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; _message:= ($$ { "status":"fail", "message":"error updating sources" } $$::jsonb) ||jsonb_build_object('message_text',_MESSAGE_TEXT) ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); RETURN _message; END; $f$ LANGUAGE plpgsql;