2123 lines
68 KiB
PL/PgSQL
2123 lines
68 KiB
PL/PgSQL
------create dev schema and api user-----------------------------------------------------------------------------------------------------------------
|
|
|
|
DROP SCHEMA IF EXISTS tps CASCADE;
|
|
DROP SCHEMA IF EXISTS tpsv CASCADE;
|
|
|
|
CREATE SCHEMA tps;
|
|
COMMENT ON SCHEMA tps IS 'third party source data';
|
|
|
|
CREATE SCHEMA tpsv;
|
|
COMMENT ON SCHEMA tps IS 'third party source views';
|
|
|
|
DROP USER IF EXISTS api;
|
|
|
|
CREATE ROLE api WITH
|
|
LOGIN
|
|
NOSUPERUSER
|
|
NOCREATEDB
|
|
NOCREATEROLE
|
|
INHERIT
|
|
NOREPLICATION
|
|
CONNECTION LIMIT -1
|
|
ENCRYPTED PASSWORD 'md56da13b696f737097e0146e47cc0d0985';
|
|
|
|
-----need to setup all database objects and then grant priveledges to api----------------------------------------------------------------------------
|
|
|
|
--grant schema USAGE
|
|
GRANT USAGE ON SCHEMA tps TO api;
|
|
GRANT USAGE ON SCHEMA tpsv TO api;
|
|
|
|
--grant current table privledges
|
|
GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tps TO api;
|
|
GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tpsv TO api;
|
|
|
|
--grant current sequence privledges
|
|
GRANT USAGE ON ALL SEQUENCES IN SCHEMA tps TO api;
|
|
GRANT USAGE ON ALL SEQUENCES IN SCHEMA tpsv TO api;
|
|
|
|
--grant future table privledges
|
|
ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api;
|
|
ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api;
|
|
|
|
--grant future sequence privledges
|
|
ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT USAGE ON SEQUENCES TO api;
|
|
ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT USAGE ON SEQUENCES TO api;
|
|
|
|
|
|
-----create tables-----------------------------------------------------------------------------------------------------------------------------------
|
|
|
|
-----regex map instructions table
|
|
|
|
CREATE TABLE tps.map_rm (
|
|
srce text NOT NULL,
|
|
target text NOT NULL,
|
|
regex jsonb,
|
|
seq integer NOT NULL,
|
|
hist jsonb
|
|
);
|
|
COMMENT ON TABLE tps.map_rm IS 'regex map instructions';
|
|
|
|
-----return value table
|
|
|
|
CREATE TABLE tps.map_rv (
|
|
srce text NOT NULL,
|
|
target text NOT NULL,
|
|
retval jsonb NOT NULL,
|
|
map jsonb NOT NULL,
|
|
hist jsonb NOT NULL
|
|
);
|
|
COMMENT ON TABLE tps.map_rv IS 'return value lookup table';
|
|
|
|
-----source definition table
|
|
|
|
CREATE TABLE tps.srce (
|
|
srce text NOT NULL,
|
|
defn jsonb,
|
|
hist jsonb
|
|
);
|
|
COMMENT ON TABLE tps.srce IS 'source master listing and definition';
|
|
|
|
-----source data table
|
|
|
|
CREATE TABLE tps.trans (
|
|
id integer NOT NULL,
|
|
srce text,
|
|
rec jsonb,
|
|
parse jsonb,
|
|
map jsonb,
|
|
allj jsonb,
|
|
ic jsonb,
|
|
logid INTEGER
|
|
);
|
|
COMMENT ON TABLE tps.trans IS 'source records';
|
|
COMMENT ON COLUMN tps.trans.ic IS 'input constraint value';
|
|
ALTER TABLE tps.trans ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
|
|
SEQUENCE NAME tps.trans_id_seq
|
|
START WITH 1
|
|
INCREMENT BY 1
|
|
NO MINVALUE
|
|
NO MAXVALUE
|
|
CACHE 1
|
|
);
|
|
|
|
-----import log table
|
|
|
|
CREATE TABLE tps.trans_log (
|
|
id integer NOT NULL,
|
|
info jsonb
|
|
);
|
|
COMMENT ON TABLE tps.trans_log IS 'import event information';
|
|
ALTER TABLE tps.trans_log ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
|
|
SEQUENCE NAME tps.trans_log_id_seq
|
|
START WITH 1
|
|
INCREMENT BY 1
|
|
NO MINVALUE
|
|
NO MAXVALUE
|
|
CACHE 1
|
|
);
|
|
|
|
|
|
-------------primary keys----------------------------------------------------------------------------------------------------------------------------
|
|
|
|
ALTER TABLE ONLY tps.map_rm
|
|
ADD CONSTRAINT map_rm_pk PRIMARY KEY (srce, target);
|
|
|
|
ALTER TABLE ONLY tps.map_rv
|
|
ADD CONSTRAINT map_rv_pk PRIMARY KEY (srce, target, retval);
|
|
|
|
ALTER TABLE ONLY tps.srce
|
|
ADD CONSTRAINT srce_pkey PRIMARY KEY (srce);
|
|
|
|
ALTER TABLE ONLY tps.trans_log
|
|
ADD CONSTRAINT trans_log_pkey PRIMARY KEY (id);
|
|
|
|
ALTER TABLE ONLY tps.trans
|
|
ADD CONSTRAINT trans_pkey PRIMARY KEY (id);
|
|
|
|
-------------indexes---------------------------------------------------------------------------------------------------------------------------------
|
|
|
|
CREATE INDEX trans_allj ON tps.trans USING gin (allj);
|
|
|
|
CREATE INDEX trans_rec ON tps.trans USING gin (rec);
|
|
|
|
CREATE INDEX trans_srce ON tps.trans USING btree (srce);
|
|
|
|
-------------foreign keys----------------------------------------------------------------------------------------------------------------------------
|
|
|
|
ALTER TABLE ONLY tps.map_rm
|
|
ADD CONSTRAINT map_rm_fk_srce FOREIGN KEY (srce) REFERENCES tps.srce(srce);
|
|
|
|
ALTER TABLE ONLY tps.map_rv
|
|
ADD CONSTRAINT map_rv_fk_rm FOREIGN KEY (srce, target) REFERENCES tps.map_rm(srce, target);
|
|
|
|
ALTER TABLE ONLY tps.trans
|
|
ADD CONSTRAINT trans_srce_fkey FOREIGN KEY (srce) REFERENCES tps.srce(srce);
|
|
|
|
ALTER TABLE ONLY tps.trans
|
|
ADD CONSTRAINT trans_logid_fkey FOREIGN KEY (logid) REFERENCES tps.trans_log(id);
|
|
|
|
-------------create functions------------------------------------------------------------------------------------------------------------------------
|
|
|
|
-----set source
|
|
DROP FUNCTION IF EXISTS tps.srce_set(jsonb);
|
|
CREATE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_message jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
_rebuild BOOLEAN;
|
|
BEGIN
|
|
|
|
---------test if anythign is changing--------------------------------------------------------------------------------------------
|
|
|
|
IF _defn = (SELECT defn FROM tps.srce WHERE srce = _defn->>'name') THEN
|
|
_message:=
|
|
(
|
|
$$
|
|
{
|
|
"status":"complete",
|
|
"message":"source was not different no action taken"
|
|
}
|
|
$$::jsonb
|
|
);
|
|
RETURN _message;
|
|
END IF;
|
|
|
|
---------if the constraint definition is changing, rebuild for existing records---------------------------------------------------
|
|
|
|
SELECT
|
|
NOT (_defn->'constraint' = (SELECT defn->'constraint' FROM tps.srce WHERE srce = _defn->>'name'))
|
|
INTO
|
|
_rebuild;
|
|
|
|
RAISE NOTICE '%',_rebuild::text;
|
|
|
|
---------do merge-----------------------------------------------------------------------------------------------------------------
|
|
|
|
INSERT INTO
|
|
tps.srce (srce, defn, hist)
|
|
SELECT
|
|
--extract name from defintion
|
|
_defn->>'name'
|
|
--add current timestamp to defintions
|
|
,_defn
|
|
--add definition
|
|
,jsonb_build_object(
|
|
'hist_defn',_defn
|
|
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
|
) || '[]'::jsonb
|
|
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
|
|
SET
|
|
defn = _defn
|
|
,hist =
|
|
--the new definition going to position -0-
|
|
jsonb_build_object(
|
|
'hist_defn',_defn
|
|
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
|
)
|
|
--the previous definition, set upper bound of effective range which was previously null
|
|
|| jsonb_set(
|
|
srce.hist
|
|
,'{0,effective,1}'::text[]
|
|
,to_jsonb(CURRENT_TIMESTAMP)
|
|
);
|
|
--rebuild constraint key if necessary---------------------------------------------------------------------------------------
|
|
|
|
IF _rebuild THEN
|
|
WITH
|
|
rebuild AS (
|
|
SELECT
|
|
j.srce
|
|
,j.rec
|
|
,j.id
|
|
--aggregate back to the record since multiple paths may be listed in the constraint
|
|
,tps.jsonb_concat_obj(
|
|
jsonb_build_object(
|
|
--the new json key is the path itself
|
|
cons.path->>0
|
|
,j.rec#>((cons.path->>0)::text[])
|
|
)
|
|
) json_key
|
|
FROM
|
|
tps.trans j
|
|
INNER JOIN tps.srce s ON
|
|
s.srce = j.srce
|
|
JOIN LATERAL jsonb_array_elements(s.defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE
|
|
WHERE
|
|
s.srce = _defn->>'name'
|
|
GROUP BY
|
|
j.rec
|
|
,j.id
|
|
)
|
|
UPDATE
|
|
tps.trans t
|
|
SET
|
|
ic = r.json_key
|
|
FROM
|
|
rebuild r
|
|
WHERE
|
|
t.id = r.id;
|
|
_message:=
|
|
(
|
|
$$
|
|
{
|
|
"status":"complete",
|
|
"message":"source set and constraint rebuilt on existing records"
|
|
}
|
|
$$::jsonb
|
|
);
|
|
ELSE
|
|
_message:=
|
|
(
|
|
$$
|
|
{
|
|
"status":"complete",
|
|
"message":"source set"
|
|
}
|
|
$$::jsonb
|
|
);
|
|
END IF;
|
|
|
|
RETURN _message;
|
|
|
|
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error importing data"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
RETURN _message;
|
|
END;
|
|
$f$
|
|
LANGUAGE plpgsql;
|
|
|
|
-----generate sql to create select based on schema
|
|
DROP FUNCTION IF EXISTS tps.build_srce_view_sql(text, text);
|
|
CREATE OR REPLACE FUNCTION tps.build_srce_view_sql(_srce text, _schema text) RETURNS TEXT
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
--_schema text;
|
|
--_srce text;
|
|
_sql text;
|
|
BEGIN
|
|
--_schema:= 'default';
|
|
--_srce:= 'dcard';
|
|
SELECT
|
|
'DROP VIEW IF EXISTS tpsv.'||s.srce||'_'||(list.e->>'name')||'; CREATE VIEW tpsv.'||s.srce||'_'||(list.e->>'name')||' AS SELECT id, logid, allj, '||string_agg('(allj#>>'''||rec.PATH::text||''')::'||rec.type||' AS "'||rec.column_name||'"',', ')||' FROM tps.trans WHERE srce = '''||s.srce||''';'
|
|
INTO
|
|
_sql
|
|
FROM
|
|
tps.srce s
|
|
JOIN LATERAL jsonb_array_elements(s.defn->'schemas') list (e) ON TRUE
|
|
JOIN LATERAL jsonb_array_elements(list.e->'columns') as cols(e) ON TRUE
|
|
JOIN LATERAL jsonb_to_record (cols.e) AS rec( PATH text[], "type" text, column_name text) ON TRUE
|
|
WHERE
|
|
srce = _srce
|
|
AND list.e->>'name' = _schema
|
|
GROUP BY
|
|
s.srce
|
|
,list.e;
|
|
|
|
RETURN _sql;
|
|
RAISE NOTICE '%',_sql;
|
|
|
|
END
|
|
$f$
|
|
LANGUAGE plpgsql;
|
|
|
|
-----set map defintion from json argument
|
|
CREATE OR REPLACE FUNCTION tps.srce_map_def_set(_defn jsonb) RETURNS jsonb
|
|
AS
|
|
$f$
|
|
|
|
DECLARE
|
|
_message jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
|
|
BEGIN
|
|
|
|
BEGIN
|
|
|
|
INSERT INTO
|
|
tps.map_rm (srce, target, regex, seq, hist)
|
|
SELECT
|
|
--data source
|
|
ae.r->>'srce'
|
|
--map name
|
|
,ae.r->>'name'
|
|
--map definition
|
|
,ae.r
|
|
--map aggregation sequence
|
|
,(ae.r->>'sequence')::INTEGER
|
|
--history definition
|
|
,jsonb_build_object(
|
|
'hist_defn',ae.r
|
|
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
|
) || '[]'::jsonb
|
|
FROM
|
|
jsonb_array_elements(_defn) ae(r)
|
|
ON CONFLICT ON CONSTRAINT map_rm_pk DO UPDATE SET
|
|
srce = excluded.srce
|
|
,target = excluded.target
|
|
,regex = excluded.regex
|
|
,seq = excluded.seq
|
|
,hist =
|
|
--the new definition going to position -0-
|
|
jsonb_build_object(
|
|
'hist_defn',excluded.regex
|
|
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
|
)
|
|
--the previous definition, set upper bound of effective range which was previously null
|
|
|| jsonb_set(
|
|
map_rm.hist
|
|
,'{0,effective,1}'::text[]
|
|
,to_jsonb(CURRENT_TIMESTAMP)
|
|
);
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error setting definition"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
return _message;
|
|
END;
|
|
|
|
_message:= jsonb_build_object('status','complete','message','definition has been set');
|
|
return _message;
|
|
|
|
END;
|
|
$f$
|
|
language plpgsql;
|
|
|
|
|
|
------------build report for unmapped items---------------------------------------------------------------------------------------------------------------------------------------------
|
|
|
|
CREATE OR REPLACE FUNCTION tps.jsonb_concat(
|
|
state jsonb,
|
|
concat jsonb)
|
|
RETURNS jsonb AS
|
|
$BODY$
|
|
BEGIN
|
|
--RAISE notice 'state is %', state;
|
|
--RAISE notice 'concat is %', concat;
|
|
RETURN state || concat;
|
|
END;
|
|
$BODY$
|
|
LANGUAGE plpgsql VOLATILE
|
|
COST 100;
|
|
|
|
DROP AGGREGATE IF EXISTS tps.jsonb_concat_obj(jsonb);
|
|
CREATE AGGREGATE tps.jsonb_concat_obj(jsonb) (
|
|
SFUNC=tps.jsonb_concat,
|
|
STYPE=jsonb,
|
|
INITCOND='{}'
|
|
);
|
|
|
|
|
|
DROP FUNCTION IF EXISTS tps.report_unmapped;
|
|
CREATE FUNCTION tps.report_unmapped(_srce text) RETURNS TABLE
|
|
(
|
|
source text,
|
|
map text,
|
|
ret_val jsonb,
|
|
"count" bigint
|
|
)
|
|
LANGUAGE plpgsql
|
|
AS
|
|
$f$
|
|
BEGIN
|
|
|
|
/*
|
|
first get distinct target json values
|
|
then apply regex
|
|
*/
|
|
|
|
RETURN QUERY
|
|
WITH
|
|
|
|
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
|
|
|
|
rx AS (
|
|
SELECT
|
|
t.srce,
|
|
t.id,
|
|
t.rec,
|
|
m.target,
|
|
m.seq,
|
|
regex->'regex'->>'function' regex_function,
|
|
e.v ->> 'field' result_key_name,
|
|
e.v ->> 'key' target_json_path,
|
|
e.v ->> 'flag' regex_options_flag,
|
|
e.v->>'map' map_intention,
|
|
e.v->>'retain' retain_result,
|
|
e.v->>'regex' regex_expression,
|
|
e.rn target_item_number,
|
|
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
mt.mt rx_match,
|
|
rp.rp rx_replace,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
null
|
|
END map_key,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(mt.mt[1])
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rp.rp)
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END map_val,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
NULL
|
|
END retain_key,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(trim(mt.mt[1]))
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rtrim(rp.rp))
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END retain_val
|
|
FROM
|
|
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
tps.map_rm m
|
|
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
|
|
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
INNER JOIN tps.trans t ON
|
|
t.srce = m.srce AND
|
|
t.rec @> w.v
|
|
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
m.regex->'regex'->>'function' = 'extract'
|
|
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
m.regex->'regex'->>'function' = 'replace'
|
|
WHERE
|
|
--t.allj IS NULL
|
|
t.srce = _srce AND
|
|
e.v @> '{"map":"y"}'::jsonb
|
|
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
ORDER BY
|
|
t.id DESC,
|
|
m.target,
|
|
e.rn,
|
|
COALESCE(mt.rn,rp.rn,1)
|
|
)
|
|
|
|
--SELECT * FROM rx LIMIT 100
|
|
|
|
|
|
, agg_to_target_items AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,CASE WHEN map_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
map_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(map_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END map_val
|
|
,CASE WHEN retain_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
retain_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(retain_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END retain_val
|
|
FROM
|
|
rx
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,map_key
|
|
,retain_key
|
|
)
|
|
|
|
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
|
|
|
|
, agg_to_target AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
FROM
|
|
agg_to_target_items
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
)
|
|
|
|
|
|
, agg_to_ret AS (
|
|
SELECT
|
|
srce
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,map_val
|
|
,retain_val
|
|
,count(*) "count"
|
|
FROM
|
|
agg_to_target
|
|
GROUP BY
|
|
srce
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,map_val
|
|
,retain_val
|
|
)
|
|
|
|
, link_map AS (
|
|
SELECT
|
|
a.srce
|
|
,a.target
|
|
,a.seq
|
|
,a.map_intention
|
|
,a.map_val
|
|
,a."count"
|
|
,a.retain_val
|
|
,v.map mapped_val
|
|
FROM
|
|
agg_to_ret a
|
|
LEFT OUTER JOIN tps.map_rv v ON
|
|
v.srce = a.srce AND
|
|
v.target = a.target AND
|
|
v.retval = a.map_val
|
|
)
|
|
SELECT
|
|
l.srce
|
|
,l.target
|
|
,l.map_val
|
|
,l."count"
|
|
FROM
|
|
link_map l
|
|
WHERE
|
|
l.mapped_val IS NULL
|
|
ORDER BY
|
|
l.srce
|
|
,l.target
|
|
,l."count" desc;
|
|
END;
|
|
$f$;
|
|
|
|
|
|
-------------------create trigger to map imported items------------------------------------------------------------------------------------------------------
|
|
|
|
CREATE OR REPLACE FUNCTION tps.trans_insert_map() RETURNS TRIGGER
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_cnt INTEGER;
|
|
|
|
BEGIN
|
|
IF (TG_OP = 'INSERT') THEN
|
|
|
|
|
|
WITH
|
|
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
|
|
|
|
rx AS (
|
|
SELECT
|
|
t.srce,
|
|
t.id,
|
|
t.rec,
|
|
m.target,
|
|
m.seq,
|
|
regex->'regex'->>'function' regex_function,
|
|
e.v ->> 'field' result_key_name,
|
|
e.v ->> 'key' target_json_path,
|
|
e.v ->> 'flag' regex_options_flag,
|
|
e.v->>'map' map_intention,
|
|
e.v->>'retain' retain_result,
|
|
e.v->>'regex' regex_expression,
|
|
e.rn target_item_number,
|
|
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
mt.mt rx_match,
|
|
rp.rp rx_replace,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
null
|
|
END map_key,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(mt.mt[1])
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rp.rp)
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END map_val,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
NULL
|
|
END retain_key,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(trim(mt.mt[1]))
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rtrim(rp.rp))
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END retain_val
|
|
FROM
|
|
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
tps.map_rm m
|
|
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
|
|
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
INNER JOIN new_table t ON
|
|
t.srce = m.srce AND
|
|
t.rec @> w.v
|
|
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
m.regex->'regex'->>'function' = 'extract'
|
|
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
m.regex->'regex'->>'function' = 'replace'
|
|
ORDER BY
|
|
t.id DESC,
|
|
m.target,
|
|
e.rn,
|
|
COALESCE(mt.rn,rp.rn,1)
|
|
)
|
|
|
|
--SELECT count(*) FROM rx LIMIT 100
|
|
|
|
|
|
, agg_to_target_items AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,CASE WHEN map_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
map_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(map_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END map_val
|
|
,CASE WHEN retain_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
retain_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(retain_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END retain_val
|
|
FROM
|
|
rx
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,map_key
|
|
,retain_key
|
|
)
|
|
|
|
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
|
|
|
|
, agg_to_target AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
FROM
|
|
agg_to_target_items
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
ORDER BY
|
|
id
|
|
)
|
|
|
|
|
|
--SELECT * FROM agg_to_target
|
|
|
|
|
|
, link_map AS (
|
|
SELECT
|
|
a.srce
|
|
,a.id
|
|
,a.target
|
|
,a.seq
|
|
,a.map_intention
|
|
,a.map_val
|
|
,a.retain_val retain_value
|
|
,v.map
|
|
FROM
|
|
agg_to_target a
|
|
LEFT OUTER JOIN tps.map_rv v ON
|
|
v.srce = a.srce AND
|
|
v.target = a.target AND
|
|
v.retval = a.map_val
|
|
)
|
|
|
|
--SELECT * FROM link_map
|
|
|
|
, agg_to_id AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
|
|
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
|
|
FROM
|
|
link_map
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
)
|
|
|
|
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
|
|
|
|
--create a complete list of all new inserts assuming some do not have maps (left join)
|
|
,join_all AS (
|
|
SELECT
|
|
n.srce
|
|
,n.id
|
|
,n.rec
|
|
,a.retain_val parse
|
|
,a.map
|
|
,n.rec||COALESCE(a.map||a.retain_val,'{}'::jsonb) allj
|
|
FROM
|
|
new_table n
|
|
LEFT OUTER JOIN agg_to_id a ON
|
|
a.id = n.id
|
|
)
|
|
|
|
--update trans with join_all recs
|
|
UPDATE
|
|
tps.trans t
|
|
SET
|
|
parse = a.parse
|
|
,map = a.map
|
|
,allj = a.allj
|
|
FROM
|
|
join_all a
|
|
WHERE
|
|
t.id = a.id;
|
|
|
|
|
|
END IF;
|
|
RETURN NULL;
|
|
END;
|
|
$f$ LANGUAGE plpgsql;
|
|
|
|
CREATE TRIGGER trans_insert
|
|
AFTER INSERT ON tps.trans
|
|
REFERENCING NEW TABLE AS new_table
|
|
FOR EACH STATEMENT EXECUTE PROCEDURE tps.trans_insert_map();
|
|
|
|
|
|
------------------import data-------------------------------------------------------------------------------------------------------------------------------------
|
|
|
|
DROP FUNCTION IF EXISTS tps.srce_import(text, jsonb);
|
|
CREATE OR REPLACE FUNCTION tps.srce_import(_srce text, _recs jsonb) RETURNS jsonb
|
|
|
|
/*--------------------------------------------------------
|
|
0. test if source exists
|
|
1. create pending list
|
|
2. get unqiue pending keys
|
|
3. see which keys not already in tps.trans
|
|
4. insert pending records associated with keys that are not already in trans
|
|
5. insert summary to log table
|
|
*/---------------------------------------------------------
|
|
|
|
--to-do
|
|
--return infomation to a client via json or composite type
|
|
|
|
|
|
AS $f$
|
|
DECLARE
|
|
_t text;
|
|
_c text;
|
|
_log_info jsonb;
|
|
_log_id text;
|
|
_cnt numeric;
|
|
_message jsonb;
|
|
--_recs jsonb;
|
|
--_srce text;
|
|
_defn jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
|
|
BEGIN
|
|
|
|
--_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv';
|
|
--_srce := 'dcard';
|
|
--_recs:= $$[{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"GOOGLE *YOUTUBE VIDEOS G.CO/HELPPAY#CAP0H07TXV","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"MICROSOFT *ONEDRIVE 800-642-7676 WA","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/3/2018","Post Date":"1/3/2018","Description":"CLE CLINIC PT PMTS 216-445-6249 OHAK2C57F2F0B3","Amount":200,"Category":"Medical Services"},{"Trans. Date":"1/4/2018","Post Date":"1/4/2018","Description":"AT&T *PAYMENT 800-288-2020 TX","Amount":57.14,"Category":"Services"},{"Trans. Date":"1/4/2018","Post Date":"1/7/2018","Description":"WWW.KOHLS.COM #0873 MIDDLETOWN OH","Amount":-7.9,"Category":"Payments and Credits"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"PIZZA HUT 007946 STOW OH","Amount":9.24,"Category":"Restaurants"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"SUBWAY 00044289255 STOW OH","Amount":10.25,"Category":"Restaurants"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"ACME NO. 17 STOW OH","Amount":103.98,"Category":"Supermarkets"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":1.69,"Category":"Merchandise"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":2.19,"Category":"Merchandise"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00947R","Amount":3.94,"Category":"Gasoline"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00915R","Amount":52.99,"Category":"Gasoline"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"AUTOZONE #0722 STOW OH","Amount":85.36,"Category":"Automotive"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":26.68,"Category":"Merchandise"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"EL CAMPESINO STOW OH","Amount":6.5,"Category":"Restaurants"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"TARGET STOW OH","Amount":197.9,"Category":"Merchandise"},{"Trans. Date":"1/14/2018","Post Date":"1/14/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":13.48,"Category":"Merchandise"},{"Trans. Date":"1/15/2018","Post Date":"1/15/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":22.41,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"BUFFALO WILD WINGS KENT KENT OH","Amount":63.22,"Category":"Restaurants"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"PARTA - KCG KENT OH","Amount":4,"Category":"Government Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"REMEMBERNHU 402-935-7733 IA","Amount":60,"Category":"Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":44.81,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TREE CITY COFFEE & PASTR KENT OH","Amount":17.75,"Category":"Restaurants"},{"Trans. Date":"1/17/2018","Post Date":"1/17/2018","Description":"BESTBUYCOM805526794885 888-BESTBUY MN","Amount":343.72,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":5.98,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"U-HAUL OF KENT-STOW KENT OH","Amount":15.88,"Category":"Travel/ Entertainment"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":5.99,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":17.16,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":500.97,"Category":"Supermarkets"},{"Trans. Date":"1/20/2018","Post Date":"1/20/2018","Description":"GOOGLE *GOOGLE PLAY G.CO/HELPPAY#CAP0HFFS7W","Amount":2.12,"Category":"Services"},{"Trans. Date":"1/20/2018","Post Date":"1/20/2018","Description":"LOWE'S OF STOW, OH. STOW OH","Amount":256.48,"Category":"Home Improvement"},{"Trans. Date":"1/23/2018","Post Date":"1/23/2018","Description":"CASHBACK BONUS REDEMPTION PYMT/STMT CRDT","Amount":-32.2,"Category":"Awards and Rebate Credits"},{"Trans. Date":"1/23/2018","Post Date":"1/23/2018","Description":"INTERNET PAYMENT - THANK YOU","Amount":-2394.51,"Category":"Payments and Credits"},{"Trans. Date":"1/27/2018","Post Date":"1/27/2018","Description":"GIANT-EAGLE #4096 STOW OH","Amount":67.81,"Category":"Supermarkets"},{"Trans. Date":"1/27/2018","Post Date":"1/27/2018","Description":"OFFICEMAX/OFFICE DEPOT63 STOW OH","Amount":21.06,"Category":"Merchandise"},{"Trans. Date":"1/27/2018","Post Date":"1/27/2018","Description":"TARGET STOW OH","Amount":71,"Category":"Merchandise"},{"Trans. Date":"1/29/2018","Post Date":"1/29/2018","Description":"NETFLIX.COM NETFLIX.COM CA19899514437","Amount":14.93,"Category":"Services"},{"Trans. Date":"1/30/2018","Post Date":"1/30/2018","Description":"SQ *TWISTED MELTZ KENT OH0002305843011416898511","Amount":16.87,"Category":"Restaurants"},{"Trans. Date":"1/30/2018","Post Date":"1/30/2018","Description":"TARGET STOW OH","Amount":49.37,"Category":"Merchandise"}]$$::jsonb;
|
|
|
|
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
|
|
|
|
SELECT
|
|
defn
|
|
INTO
|
|
_defn
|
|
FROM
|
|
tps.srce
|
|
WHERE
|
|
srce = _srce;
|
|
|
|
IF _defn IS NULL THEN
|
|
_message:=
|
|
format(
|
|
$$
|
|
{
|
|
"status":"fail",
|
|
"message":"source %L does not exists"
|
|
}
|
|
$$,
|
|
_srce
|
|
)::jsonb;
|
|
RETURN _message;
|
|
END IF;
|
|
|
|
-------------unwrap the json record and apply the path(s) of the constraint to build a constraint key per record-----------------------------------------------------------------------------------
|
|
|
|
WITH
|
|
pending_list AS (
|
|
SELECT
|
|
_srce srce
|
|
,j.rec
|
|
,j.id
|
|
--aggregate back to the record since multiple paths may be listed in the constraint
|
|
--it is unclear why the "->>0" is required to correctly extract the text array from the jsonb
|
|
,tps.jsonb_concat_obj(
|
|
jsonb_build_object(
|
|
--the new json key is the path itself
|
|
cons.path->>0
|
|
,j.rec#>((cons.path->>0)::text[])
|
|
)
|
|
) json_key
|
|
FROM
|
|
jsonb_array_elements(_recs) WITH ORDINALITY j(rec,id)
|
|
JOIN LATERAL jsonb_array_elements(_defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE
|
|
GROUP BY
|
|
j.rec
|
|
,j.id
|
|
)
|
|
|
|
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
|
|
|
, pending_keys AS (
|
|
SELECT DISTINCT
|
|
json_key
|
|
FROM
|
|
pending_list
|
|
)
|
|
|
|
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
|
|
|
|
, matched_keys AS (
|
|
SELECT DISTINCT
|
|
k.json_key
|
|
FROM
|
|
pending_keys k
|
|
INNER JOIN tps.trans t ON
|
|
t.ic = k.json_key
|
|
)
|
|
|
|
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
|
|
|
, unmatched_keys AS (
|
|
SELECT
|
|
json_key
|
|
FROM
|
|
pending_keys
|
|
|
|
EXCEPT
|
|
|
|
SELECT
|
|
json_key
|
|
FROM
|
|
matched_keys
|
|
)
|
|
|
|
--------build log record-------------------+------------------------------------------------------------------------------------------------
|
|
|
|
, logged AS (
|
|
INSERT INTO
|
|
tps.trans_log (info)
|
|
SELECT
|
|
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
|
|
||JSONB_BUILD_OBJECT('srce',_srce)
|
|
--||JSONB_BUILD_OBJECT('path',_path)
|
|
||JSONB_BUILD_OBJECT('not_inserted',
|
|
(
|
|
SELECT
|
|
jsonb_agg(json_key)
|
|
FROM
|
|
matched_keys
|
|
)
|
|
)
|
|
||JSONB_BUILD_OBJECT('inserted',
|
|
(
|
|
SELECT
|
|
jsonb_agg(json_key)
|
|
FROM
|
|
unmatched_keys
|
|
)
|
|
)
|
|
RETURNING *
|
|
)
|
|
|
|
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
|
|
--need to look into mapping the transactions prior to loading
|
|
|
|
, inserted AS (
|
|
INSERT INTO
|
|
tps.trans (srce, rec, ic, logid)
|
|
SELECT
|
|
pl.srce
|
|
,pl.rec
|
|
,pl.json_key
|
|
,logged.id
|
|
FROM
|
|
pending_list pl
|
|
INNER JOIN unmatched_keys u ON
|
|
u.json_key = pl.json_key
|
|
CROSS JOIN logged
|
|
ORDER BY
|
|
pl.id ASC
|
|
----this conflict is only if an exact duplicate rec json happens, which will be rejected
|
|
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
|
|
RETURNING *
|
|
)
|
|
|
|
SELECT
|
|
id
|
|
,info
|
|
INTO
|
|
_log_id
|
|
,_log_info
|
|
FROM
|
|
logged;
|
|
|
|
--RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
|
|
|
|
_message:=
|
|
(
|
|
$$
|
|
{
|
|
"status":"complete"
|
|
}
|
|
$$::jsonb
|
|
)||jsonb_build_object('details',_log_info);
|
|
|
|
RETURN _message;
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error importing data"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
return _message;
|
|
END;
|
|
$f$
|
|
LANGUAGE plpgsql;
|
|
|
|
|
|
---------------overwrite maps--------------------------------------------------------------------------------------------------------------
|
|
|
|
CREATE OR REPLACE FUNCTION tps.srce_map_overwrite(_srce text) RETURNS jsonb
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_message jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
|
|
BEGIN
|
|
WITH
|
|
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
|
|
|
|
rx AS (
|
|
SELECT
|
|
t.srce,
|
|
t.id,
|
|
t.rec,
|
|
m.target,
|
|
m.seq,
|
|
regex->'regex'->>'function' regex_function,
|
|
e.v ->> 'field' result_key_name,
|
|
e.v ->> 'key' target_json_path,
|
|
e.v ->> 'flag' regex_options_flag,
|
|
e.v->>'map' map_intention,
|
|
e.v->>'retain' retain_result,
|
|
e.v->>'regex' regex_expression,
|
|
e.rn target_item_number,
|
|
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
mt.mt rx_match,
|
|
rp.rp rx_replace,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
null
|
|
END map_key,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(mt.mt[1])
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rp.rp)
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END map_val,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
NULL
|
|
END retain_key,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(trim(mt.mt[1]))
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rtrim(rp.rp))
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END retain_val
|
|
FROM
|
|
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
tps.map_rm m
|
|
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
|
|
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
INNER JOIN tps.trans t ON
|
|
t.srce = m.srce AND
|
|
t.rec @> w.v
|
|
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
m.regex->'regex'->>'function' = 'extract'
|
|
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
m.regex->'regex'->>'function' = 'replace'
|
|
WHERE
|
|
--t.allj IS NULL
|
|
t.srce = _srce
|
|
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
ORDER BY
|
|
t.id DESC,
|
|
m.target,
|
|
e.rn,
|
|
COALESCE(mt.rn,rp.rn,1)
|
|
)
|
|
|
|
--SELECT count(*) FROM rx LIMIT 100
|
|
|
|
|
|
, agg_to_target_items AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,CASE WHEN map_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
map_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(map_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END map_val
|
|
,CASE WHEN retain_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
retain_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(retain_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END retain_val
|
|
FROM
|
|
rx
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,map_key
|
|
,retain_key
|
|
)
|
|
|
|
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
|
|
|
|
, agg_to_target AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
FROM
|
|
agg_to_target_items
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
ORDER BY
|
|
id
|
|
)
|
|
|
|
|
|
--SELECT * FROM agg_to_target
|
|
|
|
|
|
, link_map AS (
|
|
SELECT
|
|
a.srce
|
|
,a.id
|
|
,a.target
|
|
,a.seq
|
|
,a.map_intention
|
|
,a.map_val
|
|
,a.retain_val retain_value
|
|
,v.map
|
|
FROM
|
|
agg_to_target a
|
|
LEFT OUTER JOIN tps.map_rv v ON
|
|
v.srce = a.srce AND
|
|
v.target = a.target AND
|
|
v.retval = a.map_val
|
|
)
|
|
|
|
--SELECT * FROM link_map
|
|
|
|
, agg_to_id AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
|
|
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
|
|
FROM
|
|
link_map
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
)
|
|
|
|
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
|
|
|
|
|
|
|
|
UPDATE
|
|
tps.trans t
|
|
SET
|
|
map = o.map,
|
|
parse = o.retain_val,
|
|
allj = t.rec||o.map||o.retain_val
|
|
FROM
|
|
agg_to_id o
|
|
WHERE
|
|
o.id = t.id;
|
|
|
|
_message:= jsonb_build_object('status','complete');
|
|
RETURN _message;
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error setting map value"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
|
|
RETURN _message;
|
|
END;
|
|
$f$
|
|
language plpgsql;
|
|
|
|
---------------------set map values from json array of json objects-----------------------------------------------------
|
|
|
|
|
|
DROP FUNCTION IF EXISTS tps.map_rv_set;
|
|
CREATE OR REPLACE FUNCTION tps.map_rv_set(_defn jsonb) RETURNS jsonb
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_message jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
BEGIN
|
|
INSERT INTO
|
|
tps.map_rv (srce, target, retval, map, hist)
|
|
SELECT
|
|
r.source
|
|
,r.map
|
|
,r.ret_val
|
|
,r.mapped
|
|
,jsonb_build_object(
|
|
'hist_defn',mapped
|
|
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
|
) || '[]'::jsonb
|
|
FROM
|
|
JSONB_ARRAY_ELEMENTS(_defn) WITH ORDINALITY ae(r,s)
|
|
JOIN LATERAL jsonb_to_record(ae.r) r(source TEXT,map TEXT, ret_val jsonb, mapped jsonb) ON TRUE
|
|
ON CONFLICT ON CONSTRAINT map_rv_pk DO UPDATE
|
|
SET
|
|
map = excluded.map
|
|
,hist =
|
|
--the new definition going to position -0-
|
|
jsonb_build_object(
|
|
'hist_defn',excluded.map
|
|
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
|
)
|
|
--the previous definition, set upper bound of effective range which was previously null
|
|
|| jsonb_set(
|
|
map_rv.hist
|
|
,'{0,effective,1}'::text[]
|
|
,to_jsonb(CURRENT_TIMESTAMP)
|
|
);
|
|
|
|
-------return message--------------------------------------------------------------------------------------------------
|
|
_message:= jsonb_build_object('status','complete');
|
|
RETURN _message;
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error setting map value"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
|
|
RETURN _message;
|
|
END;
|
|
$f$
|
|
LANGUAGE plpgsql;
|
|
|
|
------------------------------test regex with only unique results--------------------------------------------------------------
|
|
|
|
DROP FUNCTION IF EXISTS tps.test_regex(jsonb);
|
|
CREATE FUNCTION tps.test_regex(_defn jsonb) RETURNS jsonb
|
|
LANGUAGE plpgsql
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_rslt jsonb;
|
|
BEGIN
|
|
|
|
WITH
|
|
|
|
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
|
|
|
|
rx AS (
|
|
SELECT
|
|
t.srce,
|
|
t.id,
|
|
t.rec,
|
|
m.target,
|
|
m.seq,
|
|
regex->'regex'->>'function' regex_function,
|
|
e.v ->> 'field' result_key_name,
|
|
e.v ->> 'key' target_json_path,
|
|
e.v ->> 'flag' regex_options_flag,
|
|
e.v->>'map' map_intention,
|
|
e.v->>'retain' retain_result,
|
|
e.v->>'regex' regex_expression,
|
|
e.rn target_item_number,
|
|
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
mt.mt rx_match,
|
|
rp.rp rx_replace,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
null
|
|
END map_key,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(mt.mt[1])
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rp.rp)
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END map_val,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
NULL
|
|
END retain_key,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(trim(mt.mt[1]))
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rtrim(rp.rp))
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END retain_val
|
|
FROM
|
|
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
(SELECT _defn->>'srce' srce, _defn->>'name' target, _defn->'regex' regex, (_defn->>'sequence')::numeric seq) m
|
|
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
|
|
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
INNER JOIN tps.trans t ON
|
|
t.srce = m.srce AND
|
|
t.rec @> w.v
|
|
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
m.regex->'regex'->>'function' = 'extract'
|
|
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
m.regex->'regex'->>'function' = 'replace'
|
|
ORDER BY
|
|
t.id DESC,
|
|
m.target,
|
|
e.rn,
|
|
COALESCE(mt.rn,rp.rn,1)
|
|
)
|
|
|
|
--SELECT * FROM rx LIMIT 100
|
|
|
|
|
|
, agg_to_target_items AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,CASE WHEN map_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
map_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(map_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END map_val
|
|
,CASE WHEN retain_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
retain_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(retain_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END retain_val
|
|
FROM
|
|
rx
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,map_key
|
|
,retain_key
|
|
)
|
|
|
|
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
|
|
|
|
, agg_to_target AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
FROM
|
|
agg_to_target_items
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
)
|
|
|
|
|
|
, agg_to_ret AS (
|
|
SELECT
|
|
srce
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,map_val
|
|
,retain_val
|
|
,count(*) "count"
|
|
FROM
|
|
agg_to_target
|
|
GROUP BY
|
|
srce
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,map_val
|
|
,retain_val
|
|
)
|
|
,agg_to_id AS (
|
|
SELECT
|
|
l.srce
|
|
,l.target
|
|
,l.map_val
|
|
,l."count"
|
|
FROM
|
|
agg_to_ret l
|
|
ORDER BY
|
|
l.srce
|
|
,l.target
|
|
,l."count" desc
|
|
)
|
|
SELECT
|
|
jsonb_agg(row_to_json(agg_to_id)::jsonb)
|
|
INTO
|
|
_rslt
|
|
FROM
|
|
agg_to_id;
|
|
|
|
RETURN _rslt;
|
|
END;
|
|
$f$;
|
|
|
|
------------------------------test regex with all original records--------------------------------------------------------------
|
|
|
|
DROP FUNCTION IF EXISTS tps.report_unmapped_recs;
|
|
CREATE FUNCTION tps.report_unmapped_recs(_srce text) RETURNS TABLE
|
|
(
|
|
source text,
|
|
map text,
|
|
ret_val jsonb,
|
|
"count" bigint,
|
|
recs jsonb
|
|
|
|
)
|
|
LANGUAGE plpgsql
|
|
AS
|
|
$f$
|
|
BEGIN
|
|
|
|
/*
|
|
first get distinct target json values
|
|
then apply regex
|
|
*/
|
|
|
|
RETURN QUERY
|
|
WITH
|
|
|
|
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
|
|
|
|
rx AS (
|
|
SELECT
|
|
t.srce,
|
|
t.id,
|
|
t.rec,
|
|
m.target,
|
|
m.seq,
|
|
regex->'regex'->>'function' regex_function,
|
|
e.v ->> 'field' result_key_name,
|
|
e.v ->> 'key' target_json_path,
|
|
e.v ->> 'flag' regex_options_flag,
|
|
e.v->>'map' map_intention,
|
|
e.v->>'retain' retain_result,
|
|
e.v->>'regex' regex_expression,
|
|
e.rn target_item_number,
|
|
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
mt.mt rx_match,
|
|
rp.rp rx_replace,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
null
|
|
END map_key,
|
|
CASE e.v->>'map'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(mt.mt[1])
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rp.rp)
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END map_val,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
e.v->>'field'
|
|
ELSE
|
|
NULL
|
|
END retain_key,
|
|
CASE e.v->>'retain'
|
|
WHEN 'y' THEN
|
|
CASE regex->'regex'->>'function'
|
|
WHEN 'extract' THEN
|
|
CASE WHEN array_upper(mt.mt,1)=1
|
|
THEN to_json(trim(mt.mt[1]))
|
|
ELSE array_to_json(mt.mt)
|
|
END::jsonb
|
|
WHEN 'replace' THEN
|
|
to_jsonb(rtrim(rp.rp))
|
|
ELSE
|
|
'{}'::jsonb
|
|
END
|
|
ELSE
|
|
NULL
|
|
END retain_val
|
|
FROM
|
|
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
tps.map_rm m
|
|
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'where') w(v) ON TRUE
|
|
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
INNER JOIN tps.trans t ON
|
|
t.srce = m.srce AND
|
|
t.rec @> w.v
|
|
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'regex'->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
m.regex->'regex'->>'function' = 'extract'
|
|
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
m.regex->'regex'->>'function' = 'replace'
|
|
WHERE
|
|
--t.allj IS NULL
|
|
t.srce = _srce AND
|
|
e.v @> '{"map":"y"}'::jsonb
|
|
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
ORDER BY
|
|
t.id DESC,
|
|
m.target,
|
|
e.rn,
|
|
COALESCE(mt.rn,rp.rn,1)
|
|
)
|
|
|
|
--SELECT * FROM rx LIMIT 100
|
|
|
|
|
|
, agg_to_target_items AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,rec
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,CASE WHEN map_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
map_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(map_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END map_val
|
|
,CASE WHEN retain_key IS NULL
|
|
THEN
|
|
NULL
|
|
ELSE
|
|
jsonb_build_object(
|
|
retain_key,
|
|
CASE WHEN max(result_number) = 1
|
|
THEN
|
|
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
ELSE
|
|
jsonb_agg(retain_val ORDER BY result_number)
|
|
END
|
|
)
|
|
END retain_val
|
|
FROM
|
|
rx
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,rec
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,regex_function
|
|
,target_item_number
|
|
,result_key_name
|
|
,target_json_path
|
|
,map_key
|
|
,retain_key
|
|
)
|
|
|
|
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
|
|
|
|
, agg_to_target AS (
|
|
SELECT
|
|
srce
|
|
,id
|
|
,rec
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
FROM
|
|
agg_to_target_items
|
|
GROUP BY
|
|
srce
|
|
,id
|
|
,rec
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
)
|
|
|
|
|
|
, agg_to_ret AS (
|
|
SELECT
|
|
srce
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,map_val
|
|
,retain_val
|
|
,count(*) "count"
|
|
,jsonb_agg(rec) rec
|
|
FROM
|
|
agg_to_target
|
|
GROUP BY
|
|
srce
|
|
,target
|
|
,seq
|
|
,map_intention
|
|
,map_val
|
|
,retain_val
|
|
)
|
|
|
|
, link_map AS (
|
|
SELECT
|
|
a.srce
|
|
,a.target
|
|
,a.seq
|
|
,a.map_intention
|
|
,a.map_val
|
|
,a."count"
|
|
,a.rec
|
|
,a.retain_val
|
|
,v.map mapped_val
|
|
FROM
|
|
agg_to_ret a
|
|
LEFT OUTER JOIN tps.map_rv v ON
|
|
v.srce = a.srce AND
|
|
v.target = a.target AND
|
|
v.retval = a.map_val
|
|
)
|
|
SELECT
|
|
l.srce
|
|
,l.target
|
|
,l.map_val
|
|
,l."count"
|
|
,l.rec
|
|
FROM
|
|
link_map l
|
|
WHERE
|
|
l.mapped_val IS NULL
|
|
ORDER BY
|
|
l.srce
|
|
,l.target
|
|
,l."count" desc;
|
|
END;
|
|
$f$;
|
|
|
|
|
|
--setup function to delete a single source
|
|
DROP FUNCTION IF EXISTS tps.srce_delete(jsonb);
|
|
CREATE FUNCTION tps.srce_delete(_defn jsonb) RETURNS jsonb
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_message jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
_rebuild BOOLEAN;
|
|
BEGIN
|
|
|
|
-------------------------------do delete---------------------------------
|
|
|
|
DELETE FROM tps.srce WHERE srce = _defn->>'name';
|
|
--could move this record to a "recycle bin" table for a certain period of time
|
|
--need to handle cascading record deletes
|
|
|
|
---------------------------set message-----------------------------------
|
|
_message:=
|
|
(
|
|
$$
|
|
{
|
|
"status":"complete",
|
|
"message":"source was permanently deleted"
|
|
}
|
|
$$::jsonb
|
|
);
|
|
|
|
RETURN _message;
|
|
|
|
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error dropping the source"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
RETURN _message;
|
|
END;
|
|
$f$
|
|
LANGUAGE plpgsql;
|
|
|
|
/*
|
|
This function takes and array of definition object where "name" object is the primary key
|
|
It will force the entire body of sources to match what is received
|
|
*/
|
|
DROP FUNCTION IF EXISTS tps.srce_overwrite_all(jsonb);
|
|
CREATE FUNCTION tps.srce_overwrite_all(_defn jsonb) RETURNS jsonb
|
|
AS
|
|
$f$
|
|
DECLARE
|
|
_message jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
_rebuild BOOLEAN;
|
|
BEGIN
|
|
|
|
WITH
|
|
--retain the results of the update by srce
|
|
_set AS (
|
|
SELECT
|
|
j.rn rn
|
|
,j.e->>'name' srce
|
|
,j.e defn
|
|
FROM
|
|
jsonb_array_elements(_defn) WITH ORDINALITY j(e, rn)
|
|
)
|
|
--full join
|
|
,_full AS (
|
|
SELECT
|
|
COALESCE(_srce.srce,_set.srce) srce
|
|
,CASE COALESCE(_set.srce,'DELETE') WHEN 'DELETE' THEN 'DELETE' ELSE 'SET' END actn
|
|
,COALESCE(_set.defn,_srce.defn) defn
|
|
FROM
|
|
tps.srce _srce
|
|
FULL OUTER JOIN _set ON
|
|
_set.srce = _srce.srce
|
|
)
|
|
--call functions from list
|
|
,_do AS (
|
|
SELECT
|
|
f.srce
|
|
,f.actn
|
|
,COALESCE(setd.message, deld.message) message
|
|
FROM
|
|
_full f
|
|
LEFT JOIN LATERAL tps.srce_set(defn) setd(message) ON f.actn = 'SET'
|
|
LEFT JOIN LATERAL tps.srce_delete(defn) deld(message) ON f.actn = 'DELETE'
|
|
)
|
|
--aggregate all the messages into one message
|
|
----
|
|
---- should look at rolling back the whole thing if one of the function returns a fail. stored proc could do this.
|
|
----
|
|
SELECT
|
|
jsonb_agg(jsonb_build_object('source',srce,'status',message->>'status','message',message->>'message'))
|
|
INTO
|
|
_message
|
|
FROM
|
|
_do;
|
|
|
|
RETURN _message;
|
|
|
|
|
|
|
|
EXCEPTION WHEN OTHERS THEN
|
|
GET STACKED DIAGNOSTICS
|
|
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
_message:=
|
|
($$
|
|
{
|
|
"status":"fail",
|
|
"message":"error updating sources"
|
|
}
|
|
$$::jsonb)
|
|
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
RETURN _message;
|
|
END;
|
|
$f$
|
|
LANGUAGE plpgsql; |