tps/deploy/setup.sql

1393 lines
45 KiB
MySQL
Raw Normal View History

2018-05-23 00:31:45 -04:00
------create dev schema and api user-----------------------------------------------------------------------------------------------------------------
DROP SCHEMA IF EXISTS tps CASCADE;
DROP SCHEMA IF EXISTS tpsv CASCADE;
CREATE SCHEMA tps;
COMMENT ON SCHEMA tps IS 'third party source data';
CREATE SCHEMA tpsv;
COMMENT ON SCHEMA tps IS 'third party source views';
DROP USER IF EXISTS api;
2018-05-23 17:18:17 -04:00
CREATE ROLE api WITH
2018-05-23 00:31:45 -04:00
LOGIN
NOSUPERUSER
NOCREATEDB
NOCREATEROLE
INHERIT
NOREPLICATION
CONNECTION LIMIT -1
ENCRYPTED PASSWORD 'md56da13b696f737097e0146e47cc0d0985';
-----need to setup all database objects and then grant priveledges to api----------------------------------------------------------------------------
2018-05-23 17:18:17 -04:00
--grant schema USAGE
GRANT USAGE ON SCHEMA tps TO api;
2018-05-23 00:31:45 -04:00
--grant current table privledges
GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tps TO api;
GRANT SELECT, UPDATE, INSERT, DELETE ON ALL TABLES IN SCHEMA tpsv TO api;
--grant current sequence privledges
GRANT USAGE ON ALL SEQUENCES IN SCHEMA tps TO api;
GRANT USAGE ON ALL SEQUENCES IN SCHEMA tpsv TO api;
--grant future table privledges
ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api;
ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT SELECT, UPDATE, INSERT, DELETE ON TABLES TO api;
--grant future sequence privledges
ALTER DEFAULT PRIVILEGES IN SCHEMA tps GRANT USAGE ON SEQUENCES TO api;
ALTER DEFAULT PRIVILEGES IN SCHEMA tpsv GRANT USAGE ON SEQUENCES TO api;
-----create tables-----------------------------------------------------------------------------------------------------------------------------------
-----regex map instructions table
CREATE TABLE tps.map_rm (
srce text NOT NULL,
target text NOT NULL,
regex jsonb,
seq integer NOT NULL,
hist jsonb
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.map_rm IS 'regex map instructions';
-----return value table
CREATE TABLE tps.map_rv (
srce text NOT NULL,
target text NOT NULL,
retval jsonb NOT NULL,
2018-05-25 09:54:00 -04:00
map jsonb NOT NULL,
hist jsonb NOT NULL
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.map_rv IS 'return value lookup table';
-----source definition table
CREATE TABLE tps.srce (
srce text NOT NULL,
defn jsonb,
hist jsonb
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.srce IS 'source master listing and definition';
-----source data table
CREATE TABLE tps.trans (
id integer NOT NULL,
srce text,
rec jsonb,
parse jsonb,
map jsonb,
allj jsonb,
ic jsonb,
logid INTEGER
2018-05-23 00:31:45 -04:00
);
COMMENT ON TABLE tps.trans IS 'source records';
COMMENT ON COLUMN tps.trans.ic IS 'input constraint value';
ALTER TABLE tps.trans ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME tps.trans_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
-----import log table
CREATE TABLE tps.trans_log (
id integer NOT NULL,
info jsonb
);
COMMENT ON TABLE tps.trans_log IS 'import event information';
ALTER TABLE tps.trans_log ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME tps.trans_log_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
-------------primary keys----------------------------------------------------------------------------------------------------------------------------
ALTER TABLE ONLY tps.map_rm
ADD CONSTRAINT map_rm_pk PRIMARY KEY (srce, target);
ALTER TABLE ONLY tps.map_rv
ADD CONSTRAINT map_rv_pk PRIMARY KEY (srce, target, retval);
ALTER TABLE ONLY tps.srce
ADD CONSTRAINT srce_pkey PRIMARY KEY (srce);
ALTER TABLE ONLY tps.trans_log
ADD CONSTRAINT trans_log_pkey PRIMARY KEY (id);
ALTER TABLE ONLY tps.trans
ADD CONSTRAINT trans_pkey PRIMARY KEY (id);
-------------indexes---------------------------------------------------------------------------------------------------------------------------------
CREATE INDEX trans_allj ON tps.trans USING gin (allj);
CREATE INDEX trans_rec ON tps.trans USING gin (rec);
CREATE INDEX trans_srce ON tps.trans USING btree (srce);
-------------foreign keys----------------------------------------------------------------------------------------------------------------------------
ALTER TABLE ONLY tps.map_rm
ADD CONSTRAINT map_rm_fk_srce FOREIGN KEY (srce) REFERENCES tps.srce(srce);
ALTER TABLE ONLY tps.map_rv
ADD CONSTRAINT map_rv_fk_rm FOREIGN KEY (srce, target) REFERENCES tps.map_rm(srce, target);
ALTER TABLE ONLY tps.trans
ADD CONSTRAINT trans_srce_fkey FOREIGN KEY (srce) REFERENCES tps.srce(srce);
ALTER TABLE ONLY tps.trans
ADD CONSTRAINT trans_logid_fkey FOREIGN KEY (logid) REFERENCES tps.trans_log(id);
-------------create functions------------------------------------------------------------------------------------------------------------------------
-----set source
2018-05-25 10:09:12 -04:00
DROP FUNCTION IF EXISTS tps.srce_set(jsonb);
CREATE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
INSERT INTO
tps.srce (srce, defn, hist)
SELECT
--extract name from defintion
_defn->>'name'
--add current timestamp to defintions
,_defn
--add definition
,jsonb_build_object(
'hist_defn',_defn
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
) || '[]'::jsonb
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
SET
defn = _defn
,hist =
--the new definition going to position -0-
jsonb_build_object(
'hist_defn',_defn
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
)
--the previous definition, set upper bound of effective range which was previously null
|| jsonb_set(
srce.hist
,'{0,effective,1}'::text[]
,to_jsonb(CURRENT_TIMESTAMP)
);
_message:=
(
$$
{
"status":"complete",
"message":"source set"
}
$$::jsonb
);
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error importing data"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
LANGUAGE plpgsql;
-----generate sql to create select based on schema
2018-05-25 10:09:12 -04:00
DROP FUNCTION IF EXISTS tps.build_srce_view_sql(text, text);
2018-05-25 10:25:09 -04:00
CREATE OR REPLACE FUNCTION tps.build_srce_view_sql(_srce text, _schema text) RETURNS TEXT
AS
$f$
DECLARE
--_schema text;
_path text[];
--_srce text;
_sql text;
BEGIN
--_schema:= 'default';
_path:= ARRAY['schemas',_schema]::text[];
--_srce:= 'dcard';
SELECT
2018-05-25 10:25:09 -04:00
'CREATE VIEW tpsv.'||_srce||'_'||_path[2]||' AS SELECT '||string_agg('(allj#>>'''||r.PATH::text||''')::'||r.type||' AS "'||r.column_name||'"',', ')||' FROM tps.trans WHERE srce = '''||_srce||''''
INTO
_sql
FROM
tps.srce
JOIN LATERAL jsonb_array_elements(defn#>_path) ae(v) ON TRUE
JOIN LATERAL jsonb_to_record (ae.v) AS r(PATH text[], "type" text, column_name text) ON TRUE
WHERE
srce = _srce
GROUP BY
srce.srce;
RETURN _sql;
RAISE NOTICE '%',_sql;
END
$f$
LANGUAGE plpgsql;
-----set map defintion from json argument
CREATE OR REPLACE FUNCTION tps.srce_map_def_set(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
BEGIN
INSERT INTO
tps.map_rm (srce, target, regex, seq, hist)
SELECT
--data source
_defn->>'srce'
--map name
,_defn->>'name'
--map definition
,_defn
--map aggregation sequence
,(_defn->>'sequence')::INTEGER
--history definition
,jsonb_build_object(
'hist_defn',_defn
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
) || '[]'::jsonb
ON CONFLICT ON CONSTRAINT map_rm_pk DO UPDATE SET
srce = _defn->>'srce'
,target = _defn->>'name'
,regex = _defn
,seq = (_defn->>'sequence')::INTEGER
,hist =
--the new definition going to position -0-
jsonb_build_object(
'hist_defn',_defn
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
)
--the previous definition, set upper bound of effective range which was previously null
|| jsonb_set(
map_rm.hist
,'{0,effective,1}'::text[]
,to_jsonb(CURRENT_TIMESTAMP)
);
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error setting definition"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
return _message;
END;
_message:= jsonb_build_object('status','complete','message','definition has been set');
return _message;
END;
$f$
2018-05-25 02:25:36 -04:00
language plpgsql;
------------build report for unmapped items---------------------------------------------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION tps.jsonb_concat(
state jsonb,
concat jsonb)
RETURNS jsonb AS
$BODY$
BEGIN
--RAISE notice 'state is %', state;
--RAISE notice 'concat is %', concat;
RETURN state || concat;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
DROP AGGREGATE IF EXISTS tps.jsonb_concat_obj(jsonb);
CREATE AGGREGATE tps.jsonb_concat_obj(jsonb) (
SFUNC=tps.jsonb_concat,
STYPE=jsonb,
INITCOND='{}'
);
2018-05-25 10:09:12 -04:00
DROP FUNCTION IF EXISTS tps.report_unmapped(text);
2018-05-25 02:25:36 -04:00
CREATE FUNCTION tps.report_unmapped(_srce text) RETURNS TABLE
(
source text,
map text,
ret_val jsonb,
"count" bigint
)
LANGUAGE plpgsql
AS
$f$
BEGIN
/*
first get distinct target json values
then apply regex
*/
RETURN QUERY
WITH
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
--------------------------json key name assigned to return value-----------------------------------------------------------------------
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
--------------------------json value resulting from regular expression-----------------------------------------------------------------
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
--------------------------flag for if retruned regex result is stored as a new part of the final json output---------------------------
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
--------------------------push regex result into json object---------------------------------------------------------------------------
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
--------------------------start with all regex maps------------------------------------------------------------------------------------
tps.map_rm m
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->>'function' = 'extract'
--------------------------same as above but for a replacement type function------------------------------------------------------------
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->>'function' = 'replace'
WHERE
--t.allj IS NULL
t.srce = _srce AND
e.v @> '{"map":"y"}'::jsonb
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
/*
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
*/
)
--SELECT * FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
)
, agg_to_ret AS (
SELECT
srce
,target
,seq
,map_intention
,map_val
,retain_val
,count(*) "count"
FROM
agg_to_target
GROUP BY
srce
,target
,seq
,map_intention
,map_val
,retain_val
)
, link_map AS (
SELECT
a.srce
,a.target
,a.seq
,a.map_intention
,a.map_val
,a."count"
,a.retain_val
,v.map mapped_val
FROM
agg_to_ret a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
SELECT
l.srce
,l.target
,l.map_val
,l."count"
FROM
link_map l
WHERE
l.mapped_val IS NULL
ORDER BY
l.srce
,l.target
,l."count" desc;
END;
$f$;
-------------------create trigger to map imported items------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION tps.trans_insert_map() RETURNS TRIGGER AS $f$
BEGIN
IF (TG_OP = 'INSERT') THEN
WITH
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
tps.map_rm m
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
INNER JOIN new_table t ON
t.srce = m.srce AND
t.rec @> w.v
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->>'function' = 'extract'
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->>'function' = 'replace'
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
)
--SELECT count(*) FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
ORDER BY
id
)
--SELECT * FROM agg_to_target
, link_map AS (
SELECT
a.srce
,a.id
,a.target
,a.seq
,a.map_intention
,a.map_val
,a.retain_val retain_value
,v.map
FROM
agg_to_target a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
--SELECT * FROM link_map
, agg_to_id AS (
SELECT
srce
,id
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
FROM
link_map
GROUP BY
srce
,id
)
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
UPDATE
tps.trans t
SET
map = o.map,
parse = o.retain_val,
allj = t.rec||o.map||o.retain_val
FROM
agg_to_id o
WHERE
o.id = t.id;
END IF;
RETURN NULL;
END;
$f$ LANGUAGE plpgsql;
CREATE TRIGGER trans_insert
AFTER INSERT ON tps.trans
REFERENCING NEW TABLE AS new_table
FOR EACH STATEMENT EXECUTE PROCEDURE tps.trans_insert_map();
------------------import data-------------------------------------------------------------------------------------------------------------------------------------
DROP FUNCTION IF EXISTS tps.srce_import(text, jsonb);
CREATE OR REPLACE FUNCTION tps.srce_import(_srce text, _recs jsonb) RETURNS jsonb
/*--------------------------------------------------------
0. test if source exists
1. create pending list
2. get unqiue pending keys
3. see which keys not already in tps.trans
4. insert pending records associated with keys that are not already in trans
5. insert summary to log table
*/---------------------------------------------------------
--to-do
--return infomation to a client via json or composite type
AS $f$
DECLARE
_t text;
_c text;
_log_info jsonb;
_log_id text;
_cnt numeric;
_message jsonb;
--_recs jsonb;
--_srce text;
_defn jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
--_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv';
--_srce := 'dcard';
--_recs:= $$[{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"GOOGLE *YOUTUBE VIDEOS G.CO/HELPPAY#CAP0H07TXV","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/2/2018","Post Date":"1/2/2018","Description":"MICROSOFT *ONEDRIVE 800-642-7676 WA","Amount":4.26,"Category":"Services"},{"Trans. Date":"1/3/2018","Post Date":"1/3/2018","Description":"CLE CLINIC PT PMTS 216-445-6249 OHAK2C57F2F0B3","Amount":200,"Category":"Medical Services"},{"Trans. Date":"1/4/2018","Post Date":"1/4/2018","Description":"AT&T *PAYMENT 800-288-2020 TX","Amount":57.14,"Category":"Services"},{"Trans. Date":"1/4/2018","Post Date":"1/7/2018","Description":"WWW.KOHLS.COM #0873 MIDDLETOWN OH","Amount":-7.9,"Category":"Payments and Credits"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"PIZZA HUT 007946 STOW OH","Amount":9.24,"Category":"Restaurants"},{"Trans. Date":"1/5/2018","Post Date":"1/7/2018","Description":"SUBWAY 00044289255 STOW OH","Amount":10.25,"Category":"Restaurants"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"ACME NO. 17 STOW OH","Amount":103.98,"Category":"Supermarkets"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":1.69,"Category":"Merchandise"},{"Trans. Date":"1/6/2018","Post Date":"1/7/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":2.19,"Category":"Merchandise"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00947R","Amount":3.94,"Category":"Gasoline"},{"Trans. Date":"1/9/2018","Post Date":"1/9/2018","Description":"CIRCLE K 05416 STOW OH00915R","Amount":52.99,"Category":"Gasoline"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"AUTOZONE #0722 STOW OH","Amount":85.36,"Category":"Automotive"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":26.68,"Category":"Merchandise"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"EL CAMPESINO STOW OH","Amount":6.5,"Category":"Restaurants"},{"Trans. Date":"1/13/2018","Post Date":"1/13/2018","Description":"TARGET STOW OH","Amount":197.9,"Category":"Merchandise"},{"Trans. Date":"1/14/2018","Post Date":"1/14/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":13.48,"Category":"Merchandise"},{"Trans. Date":"1/15/2018","Post Date":"1/15/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":22.41,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"BUFFALO WILD WINGS KENT KENT OH","Amount":63.22,"Category":"Restaurants"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"PARTA - KCG KENT OH","Amount":4,"Category":"Government Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"REMEMBERNHU 402-935-7733 IA","Amount":60,"Category":"Services"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TARGET.COM * 800-591-3869 MN","Amount":44.81,"Category":"Merchandise"},{"Trans. Date":"1/16/2018","Post Date":"1/16/2018","Description":"TREE CITY COFFEE & PASTR KENT OH","Amount":17.75,"Category":"Restaurants"},{"Trans. Date":"1/17/2018","Post Date":"1/17/2018","Description":"BESTBUYCOM805526794885 888-BESTBUY MN","Amount":343.72,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"DISCOUNT DRUG MART 32 STOW OH","Amount":5.98,"Category":"Merchandise"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"U-HAUL OF KENT-STOW KENT OH","Amount":15.88,"Category":"Travel/ Entertainment"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":5.99,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":17.16,"Category":"Supermarkets"},{"Trans. Date":"1/19/2018","Post Date":"1/19/2018","Description":"WALMART GROCERY 800-966-6546 AR","Amount":500.97,"Category":"Supermarkets"},{"Trans. Date":"1/20/2018","Post Date":"1/20/2018","Description":"GOOGLE *GOOGLE PLAY G.CO/HELPPAY#CAP0H
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
SELECT
defn
INTO
_defn
FROM
tps.srce
WHERE
srce = _srce;
IF _defn IS NULL THEN
_message:=
format(
$$
{
"status":"fail",
"message":"source %L does not exists"
}
$$,
_srce
)::jsonb;
RETURN _message;
END IF;
-------------unwrap the json record and apply the path(s) of the constraint to build a constraint key per record-----------------------------------------------------------------------------------
WITH
pending_list AS (
SELECT
_srce srce
,j.rec
,j.id
--aggregate back to the record since multiple paths may be listed in the constraint
--it is unclear why the "->>0" is required to correctly extract the text array from the jsonb
,tps.jsonb_concat_obj(
jsonb_build_object(
--the new json key is the path itself
cons.path->>0
,j.rec#>((cons.path->>0)::text[])
)
) json_key
FROM
jsonb_array_elements(_recs) WITH ORDINALITY j(rec,id)
JOIN LATERAL jsonb_array_elements(_defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE
GROUP BY
j.rec
,j.id
)
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
, pending_keys AS (
SELECT DISTINCT
json_key
FROM
pending_list
)
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
, matched_keys AS (
SELECT DISTINCT
k.json_key
FROM
pending_keys k
INNER JOIN tps.trans t ON
t.ic = k.json_key
)
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
, unmatched_keys AS (
SELECT
json_key
FROM
pending_keys
EXCEPT
SELECT
json_key
FROM
matched_keys
)
--------build log record-------------------+------------------------------------------------------------------------------------------------
2018-05-25 02:25:36 -04:00
, logged AS (
INSERT INTO
tps.trans_log (info)
SELECT
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
||JSONB_BUILD_OBJECT('srce',_srce)
--||JSONB_BUILD_OBJECT('path',_path)
||JSONB_BUILD_OBJECT('not_inserted',
(
SELECT
jsonb_agg(json_key)
FROM
matched_keys
)
)
||JSONB_BUILD_OBJECT('inserted',
(
SELECT
jsonb_agg(json_key)
FROM
unmatched_keys
)
)
RETURNING *
)
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
--need to look into mapping the transactions prior to loading
, inserted AS (
INSERT INTO
tps.trans (srce, rec, ic, logid)
SELECT
pl.srce
,pl.rec
,pl.json_key
,logged.id
FROM
pending_list pl
INNER JOIN unmatched_keys u ON
u.json_key = pl.json_key
CROSS JOIN logged
ORDER BY
pl.id ASC
----this conflict is only if an exact duplicate rec json happens, which will be rejected
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
RETURNING *
)
2018-05-25 02:25:36 -04:00
SELECT
id
,info
INTO
_log_id
,_log_info
FROM
logged;
2018-05-25 09:12:53 -04:00
--RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
2018-05-25 02:25:36 -04:00
_message:=
(
$$
{
"status":"complete"
}
$$::jsonb
)||jsonb_build_object('details',_log_info);
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error importing data"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
return _message;
END;
$f$
2018-05-25 10:09:12 -04:00
LANGUAGE plpgsql;
2018-05-25 02:25:36 -04:00
2018-05-25 09:54:00 -04:00
---------------overwrite maps--------------------------------------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION tps.srce_map_overwrite(_srce text) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
WITH
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
m.seq,
regex->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'map' map_intention,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE e.v->>'map'
WHEN 'y' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE e.v->>'map'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
tps.map_rm m
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->>'function' = 'extract'
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->>'function' = 'replace'
WHERE
--t.allj IS NULL
t.srce = _srce
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
)
--SELECT count(*) FROM rx LIMIT 100
, agg_to_target_items AS (
SELECT
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,seq
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items LIMIT 100
, agg_to_target AS (
SELECT
srce
,id
,target
,seq
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,seq
,map_intention
ORDER BY
id
)
--SELECT * FROM agg_to_target
, link_map AS (
SELECT
a.srce
,a.id
,a.target
,a.seq
,a.map_intention
,a.map_val
,a.retain_val retain_value
,v.map
FROM
agg_to_target a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
--SELECT * FROM link_map
, agg_to_id AS (
SELECT
srce
,id
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
FROM
link_map
GROUP BY
srce
,id
)
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
UPDATE
tps.trans t
SET
map = o.map,
parse = o.retain_val,
allj = t.rec||o.map||o.retain_val
FROM
agg_to_id o
WHERE
o.id = t.id;
_message:= jsonb_build_object('status','complete');
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error setting map value"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
2018-05-25 10:09:12 -04:00
language plpgsql;
---------------------set map values from json array of json objects-----------------------------------------------------
DROP FUNCTION IF EXISTS tps.map_rv_set;
CREATE OR REPLACE FUNCTION tps.map_rv_set(_defn jsonb) RETURNS jsonb
AS
$f$
DECLARE
_message jsonb;
_MESSAGE_TEXT text;
_PG_EXCEPTION_DETAIL text;
_PG_EXCEPTION_HINT text;
BEGIN
INSERT INTO
tps.map_rv (srce, target, retval, map, hist)
SELECT
r.source
,r.map
,r.ret_val
,r.mapped
,jsonb_build_object(
'hist_defn',mapped
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
) || '[]'::jsonb
FROM
JSONB_ARRAY_ELEMENTS(_defn) WITH ORDINALITY ae(r,s)
JOIN LATERAL jsonb_to_record(ae.r) r(source TEXT,map TEXT, ret_val jsonb, mapped jsonb) ON TRUE
ON CONFLICT ON CONSTRAINT map_rv_pk DO UPDATE
SET
map = excluded.map
,hist =
--the new definition going to position -0-
jsonb_build_object(
'hist_defn',excluded.map
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
)
--the previous definition, set upper bound of effective range which was previously null
|| jsonb_set(
map_rv.hist
,'{0,effective,1}'::text[]
,to_jsonb(CURRENT_TIMESTAMP)
);
-------return message--------------------------------------------------------------------------------------------------
_message:= jsonb_build_object('status','complete');
RETURN _message;
EXCEPTION WHEN OTHERS THEN
GET STACKED DIAGNOSTICS
_MESSAGE_TEXT = MESSAGE_TEXT,
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
_message:=
($$
{
"status":"fail",
"message":"error setting map value"
}
$$::jsonb)
||jsonb_build_object('message_text',_MESSAGE_TEXT)
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
RETURN _message;
END;
$f$
LANGUAGE plpgsql;