tps/database/deploy/setup.sql

2150 lines
69 KiB
MySQL
Raw Permalink Normal View History

2018-05-23 00:31:45 -04:00
------create dev schema and api user-----------------------------------------------------------------------------------------------------------------
DROP SCHEMA IF EXISTS tps CASCADE;
DROP SCHEMA IF EXISTS tpsv CASCADE;
CREATE SCHEMA tps;
COMMENT ON SCHEMA tps IS 'third party source data';
CREATE SCHEMA tpsv;
COMMENT ON SCHEMA tps IS 'third party source views';
DROP USER IF EXISTS api;
2018-05-23 17:18:17 -04:00
CREATE ROLE api WITH
2018-05-23 00:31:45 -04:00
LOGIN
NOSUPERUSER
NOCREATEDB
NOCREATEROLE
INHERIT
NOREPLICATION
CONNECTION LIMIT -1
ENCRYPTED PASSWORD 'md56da13b696f737097e0146e47cc0d0985';
-----need to setup all database objects and then grant priveledges to api----------------------------------------------------------------------------
2018-05-23 17:18:17 -04:00
--grant schema USAGE
GRANT USAGE ON SCHEMA tps TO api;
GRANT USAGE ON SCHEMA tpsv TO api;
2018-05-23 17:18:17 -04:00
2018-05-23 00:31:45 -04:00
--grant current table privledges
GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tps TO api;
GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tpsv TO api;
--grant current sequence privledges
GRANT USAGE ON ALL SEQUENCES IN SCHEMA tps TO api;
GRANT USAGE ON ALL SEQUENCES IN SCHEMA tpsv TO api;
--grant future table privledges
ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api;
ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api;
--grant future sequence privledges
ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT USAGE ON SEQUENCES TO api;
ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT USAGE ON SEQUENCES TO api;
-----create tables-----------------------------------------------------------------------------------------------------------------------------------
-----regex map instructions table
CREATE TABLE tps.map_rm (
srce text NOT NULL,
target text NOT NULL,
regex jsonb,
seq integer NOT NULL,
hist jsonb
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.map_rm IS 'regex map instructions';
-----return value table
CREATE TABLE tps.map_rv (
srce text NOT NULL,
target text NOT NULL,
retval jsonb NOT NULL,
2018-05-25 09:54:00 -04:00
map jsonb NOT NULL,
hist jsonb NOT NULL
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.map_rv IS 'return value lookup table';
-----source definition table
CREATE TABLE tps.srce (
srce text NOT NULL,
defn jsonb,
hist jsonb
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.srce IS 'source master listing and definition';
-----source data table
CREATE TABLE tps.trans (
id integer NOT NULL,
srce text,
rec jsonb,
parse jsonb,
map jsonb,
allj jsonb,
ic jsonb,
logid INTEGER
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.trans IS 'source records';
COMMENT ON COLUMN tps.trans.ic IS 'input constraint value';
ALTER TABLE tps.trans ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME tps.trans_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
-----import log table
CREATE TABLE tps.trans_log (
id integer NOT NULL,
info jsonb
);
COMMENT ON TABLE tps.trans_log IS 'import event information';
ALTER TABLE tps.trans_log ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME tps.trans_log_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
-------------primary keys----------------------------------------------------------------------------------------------------------------------------
ALTER TABLE ONLY tps.map_rm
ADD CONSTRAINT map_rm_pk PRIMARY KEY (srce, target);
ALTER TABLE ONLY tps.map_rv
ADD CONSTRAINT map_rv_pk PRIMARY KEY (srce, target, retval);
ALTER TABLE ONLY tps.srce
ADD CONSTRAINT srce_pkey PRIMARY KEY (srce);
ALTER TABLE ONLY tps.trans_log
ADD CONSTRAINT trans_log_pkey PRIMARY KEY (id);
ALTER TABLE ONLY tps.trans
ADD CONSTRAINT trans_pkey PRIMARY KEY (id);
-------------indexes---------------------------------------------------------------------------------------------------------------------------------
CREATE INDEX trans_allj ON tps.trans USING gin (allj);
CREATE INDEX trans_rec ON tps.trans USING gin (rec);
CREATE INDEX trans_srce ON tps.trans USING btree (srce);
-------------foreign keys----------------------------------------------------------------------------------------------------------------------------
ALTER TABLE ONLY tps.map_rm
ADD CONSTRAINT map_rm_fk_srce FOREIGN KEY (srce) REFERENCES tps.srce(srce);
ALTER TABLE ONLY tps.map_rv
ADD CONSTRAINT map_rv_fk_rm FOREIGN KEY (srce, target) REFERENCES tps.map_rm(srce, target);
ALTER TABLE ONLY tps.trans
ADD CONSTRAINT trans_srce_fkey FOREIGN KEY (srce) REFERENCES tps.srce(srce);
ALTER TABLE ONLY tps.trans
ADD CONSTRAINT trans_logid_fkey FOREIGN KEY (logid) REFERENCES tps.trans_log(id);
-------------create functions------------------------------------------------------------------------------------------------------------------------
-----set source
2018-05-25 10:09:12 -04:00
DROP FUNCTION IF EXISTS tps.srce_set(jsonb);
CREATE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
_rebuild BOOLEAN;
BEGIN
---------test if anythign is changing--------------------------------------------------------------------------------------------
IF _defn = (SELECT defn FROM tps.srce WHERE srce = _defn->>'name') THEN
_message:=
(
$$
{
"status":"complete",
"message":"source was not different no action taken"
}
$$::jsonb
);
RETURN _message;
END IF;
---------if the constraint definition is changing, rebuild for existing records---------------------------------------------------
SELECT
NOT (_defn->'constraint' = (SELECT defn->'constraint' FROM tps.srce WHERE srce = _defn->>'name'))
INTO
_rebuild;
RAISE NOTICE '%',_rebuild::text;
---------do merge-----------------------------------------------------------------------------------------------------------------
INSERT INTO
tps.srce (srce, defn, hist)
SELECT
--extract name from defintion
_defn->>'name'
--add current timestamp to defintions
,_defn
--add definition
,jsonb_build_object(
'hist_defn',_defn
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
) || '[]'::jsonb
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
SET
defn = _defn
,hist =
--the new definition going to position -0-
jsonb_build_object(
'hist_defn',_defn
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
)
--the previous definition, set upper bound of effective range which was previously null
|| jsonb_set(
srce.hist
,'{0,effective,1}'::text[]
,to_jsonb(CURRENT_TIMESTAMP)
);
--rebuild constraint key if necessary---------------------------------------------------------------------------------------
IF _rebuild THEN
WITH
rebuild AS (
SELECT
j.srce
,j.rec
,j.id
--aggregate back to the record since multiple paths may be listed in the constraint
,tps.jsonb_concat_obj(
jsonb_build_object(
--the new json key is the path itself
cons.path->>0
,j.rec#>((cons.path->>0)::text[])
)
) json_key
FROM
tps.trans j
INNER JOIN tps.srce s ON
s.srce = j.srce
JOIN LATERAL jsonb_array_elements(s.defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE
WHERE
s.srce = _defn->>'name'
GROUP BY
j.rec
,j.id
)
UPDATE
tps.trans t
SET
ic = r.json_key
FROM
rebuild r
WHERE
t.id = r.id;
_message:=
(
$$
{
"status":"complete",
"message":"source set and constraint rebuilt on existing records"
}
$$::jsonb
);
ELSE
_message:=
(
$$
{
"status":"complete",
"message":"source set"
}
$$::jsonb
);
END IF;
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error importing data"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
LANGUAGE plpgsql;
-----generate sql to create select based on schema
2018-05-25 10:09:12 -04:00
DROP FUNCTION IF EXISTS tps.build_srce_view_sql(text, text);
2018-05-25 10:25:09 -04:00
CREATE OR REPLACE FUNCTION tps.build_srce_view_sql(_srce text, _schema text) RETURNS TEXT
AS
$f$
DECLARE
--_schema text;
--_srce text;
_sql text;
BEGIN
--_schema:= 'default';
--_srce:= 'dcard';
SELECT
2018-12-02 01:52:34 -05:00
'DROP VIEW IF EXISTS tpsv.'||s.srce||'_'||(list.e->>'name')||'; CREATE VIEW tpsv.'||s.srce||'_'||(list.e->>'name')||' AS SELECT id, logid, allj, '||string_agg('(allj#>>'''||rec.PATH::text||''')::'||rec.type||' AS "'||rec.column_name||'"',', ')||' FROM tps.trans WHERE srce = '''||s.srce||''';'
INTO
_sql
FROM
2018-12-02 01:52:34 -05:00
tps.srce s
JOIN LATERAL jsonb_array_elements(s.defn->'schemas') list (e) ON TRUE
JOIN LATERAL jsonb_array_elements(list.e->'columns') as cols(e) ON TRUE
JOIN LATERAL jsonb_to_record (cols.e) AS rec( PATH text[], "type" text, column_name text) ON TRUE
WHERE
2018-12-02 01:52:34 -05:00
srce = _srce
AND list.e->>'name' = _schema
GROUP BY
2018-12-02 01:52:34 -05:00
s.srce
,list.e;
RETURN _sql;
RAISE NOTICE '%',_sql;
END
$f$
LANGUAGE plpgsql;
-----set map defintion from json argument
CREATE OR REPLACE FUNCTION tps.srce_map_def_set(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
BEGIN
INSERT INTO
tps.map_rm (srce, target, regex, seq, hist)
SELECT
--data source
ae.r->>'srce'
--map name
,ae.r->>'name'
--map definition
,ae.r
--map aggregation sequence
,(ae.r->>'sequence')::INTEGER
--history definition
,jsonb_build_object(
'hist_defn',ae.r
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
) || '[]'::jsonb
FROM
jsonb_array_elements(_defn) ae(r)
ON CONFLICT ON CONSTRAINT map_rm_pk DO UPDATE SET
srce = excluded.srce
,target = excluded.target
,regex = excluded.regex
,seq = excluded.seq
,hist =
--the new definition going to position -0-
jsonb_build_object(
'hist_defn',excluded.regex
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
)
--the previous definition, set upper bound of effective range which was previously null
|| jsonb_set(
map_rm.hist
,'{0,effective,1}'::text[]
,to_jsonb(CURRENT_TIMESTAMP)
);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error setting definition"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
return _message;
END;
_message:= jsonb_build_object('status','complete','message','definition has been set');
return _message;
END;
$f$
2018-05-25 02:25:36 -04:00
language plpgsql;
------------build report for unmapped items---------------------------------------------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION tps.jsonb_concat(
state jsonb,
concat jsonb)
RETURNS jsonb AS
$BODY$
BEGIN
--RAISE notice 'state is %', state;
--RAISE notice 'concat is %', concat;
RETURN state || concat;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
DROP AGGREGATE IF EXISTS tps.jsonb_concat_obj(jsonb);
CREATE AGGREGATE tps.jsonb_concat_obj(jsonb) (
SFUNC=tps.jsonb_concat,
STYPE=jsonb,
INITCOND='{}'
);
DROP FUNCTION IF EXISTS tps.report_unmapped;
2018-05-25 02:25:36 -04:00
CREATE FUNCTION tps.report_unmapped(_srce text) RETURNS TABLE
(
source text,
map text,
ret_val jsonb,
"count" bigint
)
LANGUAGE plpgsql
AS
$f$
BEGIN
/*
first get distinct target json values
then apply regex
*/
RETURN QUERY
WITH
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->'regex'->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
--------------------------start with all regex maps------------------------------------------------------------------------------------
tps.map_rm m
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->'regex'->>'function' = 'extract'
--------------------------same as above but for a replacement type function------------------------------------------------------------
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->'regex'->>'function' = 'replace'
WHERE
2018-05-25 02:25:36 -04:00
--t.allj IS NULL
t.srce = _srce AND
e.v @> '{"map":"y"}'::jsonb
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
2018-05-25 02:25:36 -04:00
)
--SELECT * FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
)
, agg_to_ret AS (
SELECT
srce
,target
,seq
,map_intention
,map_val
,retain_val
,count(*) "count"
FROM
agg_to_target
GROUP BY
srce
,target
,seq
,map_intention
,map_val
,retain_val
)
, link_map AS (
SELECT
a.srce
,a.target
,a.seq
,a.map_intention
,a.map_val
,a."count"
,a.retain_val
,v.map mapped_val
FROM
agg_to_ret a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
SELECT
l.srce
,l.target
,l.map_val
,l."count"
FROM
link_map l
WHERE
l.mapped_val IS NULL
ORDER BY
l.srce
,l.target
,l."count" desc;
END;
$f$;
-------------------create trigger to map imported items------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION tps.trans_insert_map() RETURNS TRIGGER
AS
$f$
DECLARE
_cnt INTEGER;
2018-05-25 02:25:36 -04:00
BEGIN
IF (TG_OP = 'INSERT') THEN
2018-05-25 02:25:36 -04:00
WITH
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->'regex'->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
--------------------------start with all regex maps------------------------------------------------------------------------------------
tps.map_rm m
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
INNER JOIN new_table t ON
t.srce = m.srce AND
t.rec @> w.v
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->'regex'->>'function' = 'extract'
--------------------------same as above but for a replacement type function------------------------------------------------------------
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->'regex'->>'function' = 'replace'
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
2018-05-25 02:25:36 -04:00
)
--SELECT count(*) FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
ORDER BY
id
)
--SELECT * FROM agg_to_target
, link_map AS (
SELECT
a.srce
,a.id
,a.target
,a.seq
,a.map_intention
,a.map_val
,a.retain_val retain_value
,v.map
FROM
agg_to_target a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
--SELECT * FROM link_map
, agg_to_id AS (
SELECT
srce
,id
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
FROM
link_map
GROUP BY
srce
,id
)
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
--create a complete list of all new inserts assuming some do not have maps (left join)
,join_all AS (
SELECT
n.srce
,n.id
,n.rec
,a.retain_val parse
,a.map
,n.rec||COALESCE(a.map||a.retain_val,'{}'::jsonb) allj
FROM
new_table n
LEFT OUTER JOIN agg_to_id a ON
a.id = n.id
)
2018-05-25 02:25:36 -04:00
--update trans with join_all recs
2018-05-25 02:25:36 -04:00
UPDATE
tps.trans t
SET
parse = a.parse
,map = a.map
,allj = a.allj
2018-05-25 02:25:36 -04:00
FROM
join_all a
2018-05-25 02:25:36 -04:00
WHERE
t.id = a.id;
2018-05-25 02:25:36 -04:00
END IF;
RETURN NULL;
END;
$f$ LANGUAGE plpgsql;
CREATE TRIGGER trans_insert
AFTER INSERT ON tps.trans
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT EXECUTE PROCEDURE tps.trans_insert_map();
------------------import data-------------------------------------------------------------------------------------------------------------------------------------
DROP FUNCTION IF EXISTS tps.srce_import(text, jsonb);
CREATE OR REPLACE FUNCTION tps.srce_import(_srce text, _recs jsonb) RETURNS jsonb
/*--------------------------------------------------------
0. test if source exists
1. create pending list
2. get unqiue pending keys
3. see which keys not already in tps.trans
4. insert pending records associated with keys that are not already in trans
5. insert summary to log table
*/---------------------------------------------------------
--to-do
--return infomation to a client via json or composite type
AS $f$
DECLARE
_t text;
_c text;
_log_info jsonb;
_log_id text;
_cnt numeric;
_message jsonb;
--_recs jsonb;
--_srce text;
_defn jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
--_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv';
--_srce := 'dcard';
--_recs:= $$[{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"GOOGLE *YOUTUBE VIDEOS G.CO/HELPPAY#CAP0H07TXV","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"MICROSOFT *ONEDRIVE 800-642-7676 WA","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/3/2018","Post Date":"1/3/2018","Description":"CLE CLINIC PT PMTS 216-445-6249 OHAK2C57F2F0B3","Amount":200,"Category":"Medical Services"},{"Trans. Date":"1/4/2018","Post Date":"1/4/2018","Description":"AT&T *PAYMENT 800-288-2020 TX","Amount":57.14,"Category":"Services"},{"Trans. Date":"1/4/2018","Post Date":"1/7/2018","Description":"WWW.KOHLS.COM #0873 MIDDLETOWN OH","Amount":-7.9,"Category":"Payments and Credits"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"PIZZA HUT 007946 STOW OH","Amount":9.24,"Category":"Restaurants"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"SUBWAY 00044289255 STOW OH","Amount":10.25,"Category":"Restaurants"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"ACME NO. 17 STOW OH","Amount":103.98,"Category":"Supermarkets"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":1.69,"Category":"Merchandise"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":2.19,"Category":"Merchandise"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00947R","Amount":3.94,"Category":"Gasoline"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00915R","Amount":52.99,"Category":"Gasoline"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"AUTOZONE #0722 STOW OH","Amount":85.36,"Category":"Automotive"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":26.68,"Category":"Merchandise"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"EL CAMPESINO STOW OH","Amount":6.5,"Category":"Restaurants"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"TARGET STOW OH","Amount":197.9,"Category":"Merchandise"},{"Trans. Date":"1/14/2018","Post Date":"1/14/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":13.48,"Category":"Merchandise"},{"Trans. Date":"1/15/2018","Post Date":"1/15/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":22.41,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"BUFFALO WILD WINGS KENT KENT OH","Amount":63.22,"Category":"Restaurants"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"PARTA - KCG KENT OH","Amount":4,"Category":"Government Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"REMEMBERNHU 402-935-7733 IA","Amount":60,"Category":"Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":44.81,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TREE CITY COFFEE & PASTR KENT OH","Amount":17.75,"Category":"Restaurants"},{"Trans. Date":"1/17/2018","Post Date":"1/17/2018","Description":"BESTBUYCOM805526794885 888-BESTBUY MN","Amount":343.72,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":5.98,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"U-HAUL OF KENT-STOW KENT OH","Amount":15.88,"Category":"Travel/ Entertainment"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":5.99,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":17.16,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":500.97,"Category":"Supermarkets"},{"Trans. Date":"1/20/2018","Post Date":"1/20/2018","Description":"GOOGLE *GOOGLE PLAY G.CO/HELPPAY#CAP0H
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
SELECT
defn
INTO
_defn
FROM
tps.srce
WHERE
srce = _srce;
IF _defn IS NULL THEN
_message:=
format(
$$
{
"status":"fail",
"message":"source %L does not exists"
}
$$,
_srce
)::jsonb;
RETURN _message;
END IF;
-------------unwrap the json record and apply the path(s) of the constraint to build a constraint key per record-----------------------------------------------------------------------------------
WITH
pending_list AS (
SELECT
_srce srce
,j.rec
,j.id
--aggregate back to the record since multiple paths may be listed in the constraint
--it is unclear why the "->>0" is required to correctly extract the text array from the jsonb
,tps.jsonb_concat_obj(
jsonb_build_object(
--the new json key is the path itself
cons.path->>0
,j.rec#>((cons.path->>0)::text[])
)
) json_key
FROM
jsonb_array_elements(_recs) WITH ORDINALITY j(rec,id)
JOIN LATERAL jsonb_array_elements(_defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE
GROUP BY
j.rec
,j.id
)
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
, pending_keys AS (
SELECT DISTINCT
json_key
FROM
pending_list
)
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
, matched_keys AS (
SELECT DISTINCT
k.json_key
FROM
pending_keys k
INNER JOIN tps.trans t ON
t.ic = k.json_key
)
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
, unmatched_keys AS (
SELECT
json_key
FROM
pending_keys
EXCEPT
SELECT
json_key
FROM
matched_keys
)
--------build log record-------------------+------------------------------------------------------------------------------------------------
2018-05-25 02:25:36 -04:00
, logged AS (
INSERT INTO
tps.trans_log (info)
SELECT
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
||JSONB_BUILD_OBJECT('srce',_srce)
--||JSONB_BUILD_OBJECT('path',_path)
||JSONB_BUILD_OBJECT('not_inserted',
(
SELECT
jsonb_agg(json_key)
FROM
matched_keys
)
)
||JSONB_BUILD_OBJECT('inserted',
(
SELECT
jsonb_agg(json_key)
FROM
unmatched_keys
)
)
RETURNING *
)
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
--need to look into mapping the transactions prior to loading
, inserted AS (
INSERT INTO
tps.trans (srce, rec, ic, logid)
SELECT
pl.srce
,pl.rec
,pl.json_key
,logged.id
FROM
pending_list pl
INNER JOIN unmatched_keys u ON
u.json_key = pl.json_key
CROSS JOIN logged
ORDER BY
pl.id ASC
----this conflict is only if an exact duplicate rec json happens, which will be rejected
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
RETURNING *
)
2018-05-25 02:25:36 -04:00
SELECT
id
,info
INTO
_log_id
,_log_info
FROM
logged;
2018-05-25 09:12:53 -04:00
--RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
2018-05-25 02:25:36 -04:00
_message:=
(
$$
{
"status":"complete"
}
$$::jsonb
)||jsonb_build_object('details',_log_info);
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error importing data"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
return _message;
END;
$f$
2018-05-25 10:09:12 -04:00
LANGUAGE plpgsql;
2018-05-25 02:25:36 -04:00
2018-05-25 09:54:00 -04:00
---------------overwrite maps--------------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION tps.srce_map_overwrite(_srce text) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
WITH
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->'regex'->>'function' regex_function,
2018-05-25 09:54:00 -04:00
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
2018-05-25 09:54:00 -04:00
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
2018-05-25 09:54:00 -04:00
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
--------------------------start with all regex maps------------------------------------------------------------------------------------
2018-05-25 09:54:00 -04:00
tps.map_rm m
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
2018-05-25 09:54:00 -04:00
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
2018-05-25 09:54:00 -04:00
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->'regex'->>'function' = 'extract'
--------------------------same as above but for a replacement type function------------------------------------------------------------
2018-05-25 09:54:00 -04:00
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->'regex'->>'function' = 'replace'
2018-05-25 09:54:00 -04:00
WHERE
--t.allj IS NULL
t.srce = _srce
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
)
--SELECT count(*) FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
ORDER BY
id
)
--SELECT * FROM agg_to_target
, link_map AS (
SELECT
a.srce
,a.id
,a.target
,a.seq
,a.map_intention
,a.map_val
,a.retain_val retain_value
,v.map
FROM
agg_to_target a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
--SELECT * FROM link_map
, agg_to_id AS (
SELECT
srce
,id
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
FROM
link_map
GROUP BY
srce
,id
)
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
UPDATE
tps.trans t
SET
map = o.map,
parse = o.retain_val,
allj = t.rec||o.map||o.retain_val
FROM
agg_to_id o
WHERE
o.id = t.id;
_message:= jsonb_build_object('status','complete');
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error setting map value"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
2018-05-25 10:09:12 -04:00
language plpgsql;
---------------------set map values from json array of json objects-----------------------------------------------------
DROP FUNCTION IF EXISTS tps.map_rv_set;
CREATE OR REPLACE FUNCTION tps.map_rv_set(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
INSERT INTO
tps.map_rv (srce, target, retval, map, hist)
SELECT
r.source
,r.map
,r.ret_val
,r.mapped
,jsonb_build_object(
'hist_defn',mapped
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
) || '[]'::jsonb
FROM
JSONB_ARRAY_ELEMENTS(_defn) WITH ORDINALITY ae(r,s)
JOIN LATERAL jsonb_to_record(ae.r) r(source TEXT,map TEXT, ret_val jsonb, mapped jsonb) ON TRUE
ON CONFLICT ON CONSTRAINT map_rv_pk DO UPDATE
SET
map = excluded.map
,hist =
--the new definition going to position -0-
jsonb_build_object(
'hist_defn',excluded.map
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
)
--the previous definition, set upper bound of effective range which was previously null
|| jsonb_set(
map_rv.hist
,'{0,effective,1}'::text[]
,to_jsonb(CURRENT_TIMESTAMP)
);
-------return message--------------------------------------------------------------------------------------------------
_message:= jsonb_build_object('status','complete');
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error setting map value"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
2018-05-31 02:57:59 -04:00
LANGUAGE plpgsql;
------------------------------test regex with only unique results--------------------------------------------------------------
DROP FUNCTION IF EXISTS tps.test_regex(jsonb);
CREATE FUNCTION tps.test_regex(_defn jsonb) RETURNS jsonb
LANGUAGE plpgsql
AS
$f$
DECLARE
_rslt jsonb;
BEGIN
WITH
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->'regex'->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
--------------------------start with all regex maps------------------------------------------------------------------------------------
(SELECT _defn->>'srce' srce, _defn->>'name' target, _defn->'regex' regex, (_defn->>'sequence')::numeric seq) m
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->'regex'->>'function' = 'extract'
--------------------------same as above but for a replacement type function------------------------------------------------------------
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->'regex'->>'function' = 'replace'
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
2018-05-31 02:57:59 -04:00
)
--SELECT * FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
)
, agg_to_ret AS (
SELECT
srce
,target
,seq
,map_intention
,map_val
,retain_val
,count(*) "count"
FROM
agg_to_target
GROUP BY
srce
,target
,seq
,map_intention
,map_val
,retain_val
)
,agg_to_id AS (
SELECT
l.srce
,l.target
,l.map_val
,l."count"
FROM
agg_to_ret l
ORDER BY
l.srce
,l.target
,l."count" desc
)
SELECT
jsonb_agg(row_to_json(agg_to_id)::jsonb)
INTO
_rslt
FROM
agg_to_id;
RETURN _rslt;
END;
$f$;
------------------------------test regex with all original records--------------------------------------------------------------
DROP FUNCTION IF EXISTS tps.report_unmapped_recs;
CREATE FUNCTION tps.report_unmapped_recs(_srce text) RETURNS TABLE
(
source text,
map text,
ret_val jsonb,
"count" bigint,
recs jsonb
)
2018-05-31 02:57:59 -04:00
LANGUAGE plpgsql
AS
$f$
2018-05-31 02:57:59 -04:00
BEGIN
/*
first get distinct target json values
then apply regex
*/
RETURN QUERY
2018-05-31 02:57:59 -04:00
WITH
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->'regex'->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
2018-05-31 02:57:59 -04:00
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
2018-05-31 02:57:59 -04:00
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->'regex'->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
--------------------------start with all regex maps------------------------------------------------------------------------------------
tps.map_rm m
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->'regex'->>'function' = 'extract'
--------------------------same as above but for a replacement type function------------------------------------------------------------
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->'regex'->>'function' = 'replace'
WHERE
--t.allj IS NULL
t.srce = _srce AND
e.v @> '{"map":"y"}'::jsonb
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
2018-05-31 02:57:59 -04:00
)
--SELECT * FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,rec
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,rec
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,rec
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,rec
,target
,seq
,map_intention
)
2018-05-31 02:57:59 -04:00
, agg_to_ret AS (
SELECT
srce
,target
,seq
,map_intention
,map_val
,retain_val
,count(*) "count"
,jsonb_agg(rec) rec
FROM
agg_to_target
GROUP BY
srce
,target
,seq
,map_intention
,map_val
,retain_val
)
, link_map AS (
SELECT
a.srce
,a.target
,a.seq
,a.map_intention
,a.map_val
,a."count"
,a.rec
,a.retain_val
,v.map mapped_val
FROM
agg_to_ret a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
2018-05-31 02:57:59 -04:00
SELECT
l.srce
,l.target
,l.map_val
,l."count"
,l.rec
2018-05-31 02:57:59 -04:00
FROM
link_map l
WHERE
l.mapped_val IS NULL
ORDER BY
l.srce
,l.target
,l."count" desc;
2018-05-31 02:57:59 -04:00
END;
$f$;
--setup function to delete a single source
DROP FUNCTION IF EXISTS tps.srce_delete(jsonb);
CREATE FUNCTION tps.srce_delete(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
_rebuild BOOLEAN;
BEGIN
-------------------------------do delete---------------------------------
DELETE FROM tps.srce WHERE srce = _defn->>'name';
--could move this record to a "recycle bin" table for a certain period of time
--need to handle cascading record deletes
---------------------------set message-----------------------------------
_message:=
(
$$
{
"status":"complete",
"message":"source was permanently deleted"
}
$$::jsonb
);
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error dropping the source"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
LANGUAGE plpgsql;
/*
This function takes and array of definition object where "name" object is the primary key
It will force the entire body of sources to match what is received
*/
DROP FUNCTION IF EXISTS tps.srce_overwrite_all(jsonb);
CREATE FUNCTION tps.srce_overwrite_all(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
_rebuild BOOLEAN;
_list text;
BEGIN
WITH
--retain the results of the update by srce
_set AS (
SELECT
j.rn rn
,j.e->>'name' srce
,j.e defn
FROM
jsonb_array_elements(_defn) WITH ORDINALITY j(e, rn)
)
--full join
,_full AS (
SELECT
COALESCE(_srce.srce,_set.srce) srce
,CASE COALESCE(_set.srce,'DELETE') WHEN 'DELETE' THEN 'DELETE' ELSE 'SET' END actn
,COALESCE(_set.defn,_srce.defn) defn
FROM
tps.srce _srce
FULL OUTER JOIN _set ON
_set.srce = _srce.srce
)
--call functions from list
,_do_set AS (
SELECT
f.srce
,f.actn
,setd.message
FROM
_full f
JOIN LATERAL tps.srce_set(defn) setd(message) ON f.actn = 'SET'
--dual left joins for functions that touch the same table causes the first left join actions to be undone
--LEFT JOIN LATERAL tps.srce_delete(defn) deld(message) ON f.actn = 'DELETE'
)
,_do_del AS (
SELECT
f.srce
,f.actn
,deld.message
FROM
_full f
JOIN LATERAL tps.srce_delete(defn) deld(message) ON f.actn = 'DELETE'
)
--aggregate all the messages into one message
----
---- should look at rolling back the whole thing if one of the function returns a fail. stored proc could do this.
----
SELECT
jsonb_agg(m)
INTO
_message
FROM
(
SELECT
jsonb_build_object('source',srce,'status',message->>'status','message',message->>'message') m
FROM
_do_set
UNION ALL
SELECT
jsonb_build_object('source',srce,'status',message->>'status','message',message->>'message') m
FROM
_do_del
) x;
SELECT string_agg(srce,',') INTO _list FROM tps.srce;
RAISE NOTICE 'multi source list: %', _list;
RETURN _message;
SELECT string_agg(srce,',') INTO _list FROM tps.srce;
RAISE NOTICE 'after return: %', _list;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error updating sources"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
LANGUAGE plpgsql