get rid of old functions
This commit is contained in:
parent
8641b286e6
commit
48c425ab86
@ -1,223 +0,0 @@
|
|||||||
\timing
|
|
||||||
WITH
|
|
||||||
|
|
||||||
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
rx AS (
|
|
||||||
SELECT
|
|
||||||
t.srce,
|
|
||||||
t.id,
|
|
||||||
t.rec,
|
|
||||||
m.target,
|
|
||||||
m.seq,
|
|
||||||
regex->>'function' regex_function,
|
|
||||||
e.v ->> 'field' result_key_name,
|
|
||||||
e.v ->> 'key' target_json_path,
|
|
||||||
e.v ->> 'flag' regex_options_flag,
|
|
||||||
e.v->>'map' map_intention,
|
|
||||||
e.v->>'retain' retain_result,
|
|
||||||
e.v->>'regex' regex_expression,
|
|
||||||
e.rn target_item_number,
|
|
||||||
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
||||||
mt.mt rx_match,
|
|
||||||
rp.rp rx_replace,
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
null
|
|
||||||
END map_key,
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(mt.mt[1])
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rp.rp)
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END map_val,
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_key,
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(trim(mt.mt[1]))
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rtrim(rp.rp))
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
tps.map_rm m
|
|
||||||
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.srce = m.srce AND
|
|
||||||
t.rec @> w.v
|
|
||||||
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
||||||
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
||||||
m.regex->>'function' = 'extract'
|
|
||||||
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
||||||
m.regex->>'function' = 'replace'
|
|
||||||
WHERE
|
|
||||||
--t.allj IS NULL
|
|
||||||
t.srce = 'DCARD'
|
|
||||||
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
||||||
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
||||||
ORDER BY
|
|
||||||
t.id DESC,
|
|
||||||
m.target,
|
|
||||||
e.rn,
|
|
||||||
COALESCE(mt.rn,rp.rn,1)
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT count(*) FROM rx LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target_items AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,CASE WHEN map_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
map_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(map_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END map_val
|
|
||||||
,CASE WHEN retain_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
retain_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
rx
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,map_key
|
|
||||||
,retain_key
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
||||||
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
||||||
FROM
|
|
||||||
agg_to_target_items
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
ORDER BY
|
|
||||||
id
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
--SELECT * FROM agg_to_target
|
|
||||||
|
|
||||||
|
|
||||||
, link_map AS (
|
|
||||||
SELECT
|
|
||||||
a.srce
|
|
||||||
,a.id
|
|
||||||
,a.target
|
|
||||||
,a.seq
|
|
||||||
,a.map_intention
|
|
||||||
,a.map_val
|
|
||||||
,a.retain_val retain_value
|
|
||||||
,v.map
|
|
||||||
FROM
|
|
||||||
agg_to_target a
|
|
||||||
LEFT OUTER JOIN tps.map_rv v ON
|
|
||||||
v.srce = a.srce AND
|
|
||||||
v.target = a.target AND
|
|
||||||
v.retval = a.map_val
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM link_map
|
|
||||||
|
|
||||||
, agg_to_id AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
|
|
||||||
FROM
|
|
||||||
link_map
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
UPDATE
|
|
||||||
tps.trans t
|
|
||||||
SET
|
|
||||||
map = o.map,
|
|
||||||
parse = o.retain_val,
|
|
||||||
allj = t.rec||o.map||o.retain_val
|
|
||||||
FROM
|
|
||||||
agg_to_id o
|
|
||||||
WHERE
|
|
||||||
o.id = t.id;
|
|
@ -1,16 +0,0 @@
|
|||||||
/*---------------------------------------------------------------------------
|
|
||||||
turns a single json object into a table suitable for insert to tps.map_rv
|
|
||||||
this could facilitate a call to a function for inserting many rows from ui
|
|
||||||
----------------------------------------------------------------------------*/
|
|
||||||
WITH j AS (
|
|
||||||
select
|
|
||||||
$$
|
|
||||||
[{"source":"DCARD","map":"First 20","ret_val":{"f20": "DISCOUNT DRUG MART 3"},"mapped":{"party":"Discount Drug Mart","reason":"groceries"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "TARGET STOW OH"},"mapped":{"party":"Target","reason":"groceries"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "WALMART GROCERY 800-"},"mapped":{"party":"Walmart","reason":"groceries"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "CIRCLE K 05416 STOW "},"mapped":{"party":"Circle K","reason":"gasoline"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "TARGET.COM * 800-591"},"mapped":{"party":"Target","reason":"home supplies"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "ACME NO. 17 STOW OH"},"mapped":{"party":"Acme","reason":"groceries"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "AT&T *PAYMENT 800-28"},"mapped":{"party":"AT&T","reason":"internet"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "AUTOZONE #0722 STOW "},"mapped":{"party":"Autozone","reason":"auto maint"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "BESTBUYCOM8055267948"},"mapped":{"party":"BestBuy","reason":"home supplies"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "BUFFALO WILD WINGS K"},"mapped":{"party":"Buffalo Wild Wings","reason":"restaurante"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "CASHBACK BONUS REDEM"},"mapped":{"party":"Discover Card","reason":"financing"}},{"source":"DCARD","map":"First 20","ret_val":{"f20": "CLE CLINIC PT PMTS 2"},"mapped":{"party":"Cleveland Clinic","reason":"medical"}}]
|
|
||||||
$$::jsonb x
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
jtr.*
|
|
||||||
FROM
|
|
||||||
j
|
|
||||||
LEFT JOIN LATERAL jsonb_array_elements(j.x) ae(v) ON TRUE
|
|
||||||
LEFT JOIN LATERAL jsonb_to_record(ae.v) AS jtr(source text, map text, ret_val jsonb, mapped jsonb) ON TRUE
|
|
@ -1,196 +0,0 @@
|
|||||||
\timing
|
|
||||||
|
|
||||||
/*--------------------------------------------------------
|
|
||||||
0. load target import to temp table
|
|
||||||
1. create pending list
|
|
||||||
2. get unqiue pending keys
|
|
||||||
3. see which keys not already in tps.trans
|
|
||||||
4. insert pending records associated with keys that are not already in trans
|
|
||||||
5. insert summary to log table
|
|
||||||
*/---------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
DO $$
|
|
||||||
|
|
||||||
DECLARE _t text;
|
|
||||||
DECLARE _c text;
|
|
||||||
DECLARE _path text;
|
|
||||||
DECLARE _srce text;
|
|
||||||
DECLARE _log_info text;
|
|
||||||
DECLARE _log_id text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
_path := 'C:\users\ptrowbridge\documents\tps_etl\sample_discovercard\data.csv';
|
|
||||||
_srce := 'DCARD';
|
|
||||||
|
|
||||||
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
string_agg(quote_ident(prs.key)||' '||prs.type,','),
|
|
||||||
string_agg(quote_ident(prs.key),',')
|
|
||||||
INTO
|
|
||||||
_t,
|
|
||||||
_c
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
--unwrap the schema definition array
|
|
||||||
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
|
||||||
WHERE
|
|
||||||
srce = _srce
|
|
||||||
GROUP BY
|
|
||||||
srce;
|
|
||||||
|
|
||||||
----------------------------------------------------add create table verbage in front of column list--------------------------------------------------------
|
|
||||||
|
|
||||||
_t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t);
|
|
||||||
--RAISE NOTICE '%', _t;
|
|
||||||
--RAISE NOTICE '%', _c;
|
|
||||||
|
|
||||||
DROP TABLE IF EXISTS csv_i;
|
|
||||||
|
|
||||||
EXECUTE _t;
|
|
||||||
|
|
||||||
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
--the column list needs to be dynamic forcing this whole line to be dynamic
|
|
||||||
_t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
|
|
||||||
|
|
||||||
--RAISE NOTICE '%', _t;
|
|
||||||
|
|
||||||
EXECUTE _t;
|
|
||||||
|
|
||||||
WITH
|
|
||||||
|
|
||||||
-------------extract the limiter fields to one row per source----------------------------------
|
|
||||||
|
|
||||||
ext AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,defn->'unique_constraint'->>'fields'
|
|
||||||
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
WHERE
|
|
||||||
srce = _srce
|
|
||||||
--add where clause for targeted source
|
|
||||||
)
|
|
||||||
|
|
||||||
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
|
|
||||||
|
|
||||||
,pending_list AS (
|
|
||||||
SELECT
|
|
||||||
tps.jsonb_extract(
|
|
||||||
row_to_json(i)::jsonb
|
|
||||||
,ext.text_array
|
|
||||||
) json_key,
|
|
||||||
row_to_json(i)::JSONB rec,
|
|
||||||
srce,
|
|
||||||
--ae.rn,
|
|
||||||
id
|
|
||||||
FROM
|
|
||||||
csv_i i
|
|
||||||
INNER JOIN ext ON
|
|
||||||
ext.srce = _srce
|
|
||||||
ORDER BY
|
|
||||||
id ASC
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, pending_keys AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
pending_list
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, matched_keys AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
k.json_key
|
|
||||||
FROM
|
|
||||||
pending_keys k
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.rec @> k.json_key
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, unmatched_keys AS (
|
|
||||||
SELECT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
pending_keys
|
|
||||||
|
|
||||||
EXCEPT
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
matched_keys
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
|
|
||||||
--need to look into mapping the transactions prior to loading
|
|
||||||
|
|
||||||
, inserted AS (
|
|
||||||
INSERT INTO
|
|
||||||
tps.trans (srce, rec)
|
|
||||||
SELECT
|
|
||||||
pl.srce
|
|
||||||
,pl.rec
|
|
||||||
FROM
|
|
||||||
pending_list pl
|
|
||||||
INNER JOIN unmatched_keys u ON
|
|
||||||
u.json_key = pl.json_key
|
|
||||||
ORDER BY
|
|
||||||
pl.id ASC
|
|
||||||
----this conflict is only if an exact duplicate rec json happens, which will be rejected
|
|
||||||
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
|
|
||||||
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, logged AS (
|
|
||||||
INSERT INTO
|
|
||||||
tps.trans_log (info)
|
|
||||||
SELECT
|
|
||||||
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
|
|
||||||
||JSONB_BUILD_OBJECT('srce',_srce)
|
|
||||||
||JSONB_BUILD_OBJECT('path',_path)
|
|
||||||
||JSONB_BUILD_OBJECT('not_inserted',
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
jsonb_agg(json_key)
|
|
||||||
FROM
|
|
||||||
matched_keys
|
|
||||||
)
|
|
||||||
)
|
|
||||||
||JSONB_BUILD_OBJECT('inserted',
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
jsonb_agg(json_key)
|
|
||||||
FROM
|
|
||||||
unmatched_keys
|
|
||||||
)
|
|
||||||
)
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
id
|
|
||||||
,info
|
|
||||||
INTO
|
|
||||||
_log_id
|
|
||||||
,_log_info
|
|
||||||
FROM
|
|
||||||
logged;
|
|
||||||
|
|
||||||
RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
|
|
||||||
|
|
||||||
END
|
|
||||||
$$;
|
|
||||||
|
|
@ -1,254 +0,0 @@
|
|||||||
DROP FUNCTION tps.report_unmapped;
|
|
||||||
CREATE FUNCTION tps.report_unmapped(_srce text) RETURNS TABLE
|
|
||||||
(
|
|
||||||
source text,
|
|
||||||
map text,
|
|
||||||
ret_val jsonb,
|
|
||||||
"count" bigint
|
|
||||||
)
|
|
||||||
LANGUAGE plpgsql
|
|
||||||
AS
|
|
||||||
$f$
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
/*
|
|
||||||
first get distinct target json values
|
|
||||||
then apply regex
|
|
||||||
*/
|
|
||||||
|
|
||||||
RETURN QUERY
|
|
||||||
WITH
|
|
||||||
|
|
||||||
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
rx AS (
|
|
||||||
SELECT
|
|
||||||
t.srce,
|
|
||||||
t.id,
|
|
||||||
t.rec,
|
|
||||||
m.target,
|
|
||||||
m.seq,
|
|
||||||
regex->>'function' regex_function,
|
|
||||||
e.v ->> 'field' result_key_name,
|
|
||||||
e.v ->> 'key' target_json_path,
|
|
||||||
e.v ->> 'flag' regex_options_flag,
|
|
||||||
e.v->>'map' map_intention,
|
|
||||||
e.v->>'retain' retain_result,
|
|
||||||
e.v->>'regex' regex_expression,
|
|
||||||
e.rn target_item_number,
|
|
||||||
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
||||||
mt.mt rx_match,
|
|
||||||
rp.rp rx_replace,
|
|
||||||
--------------------------json key name assigned to return value-----------------------------------------------------------------------
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
null
|
|
||||||
END map_key,
|
|
||||||
--------------------------json value resulting from regular expression-----------------------------------------------------------------
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(mt.mt[1])
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rp.rp)
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END map_val,
|
|
||||||
--------------------------flag for if retruned regex result is stored as a new part of the final json output---------------------------
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_key,
|
|
||||||
--------------------------push regex result into json object---------------------------------------------------------------------------
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(trim(mt.mt[1]))
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rtrim(rp.rp))
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
||||||
tps.map_rm m
|
|
||||||
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
||||||
JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
|
|
||||||
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
||||||
JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
||||||
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.srce = m.srce AND
|
|
||||||
t.rec @> w.v
|
|
||||||
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
||||||
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
||||||
m.regex->>'function' = 'extract'
|
|
||||||
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
||||||
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
||||||
m.regex->>'function' = 'replace'
|
|
||||||
WHERE
|
|
||||||
--t.allj IS NULL
|
|
||||||
t.srce = _srce AND
|
|
||||||
e.v @> '{"map":"y"}'::jsonb
|
|
||||||
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
||||||
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
||||||
/*
|
|
||||||
ORDER BY
|
|
||||||
t.id DESC,
|
|
||||||
m.target,
|
|
||||||
e.rn,
|
|
||||||
COALESCE(mt.rn,rp.rn,1)
|
|
||||||
*/
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM rx LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target_items AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,CASE WHEN map_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
map_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(map_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END map_val
|
|
||||||
,CASE WHEN retain_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
retain_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
rx
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,map_key
|
|
||||||
,retain_key
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
||||||
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
||||||
FROM
|
|
||||||
agg_to_target_items
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_ret AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,map_val
|
|
||||||
,retain_val
|
|
||||||
,count(*) "count"
|
|
||||||
FROM
|
|
||||||
agg_to_target
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,map_val
|
|
||||||
,retain_val
|
|
||||||
)
|
|
||||||
|
|
||||||
, link_map AS (
|
|
||||||
SELECT
|
|
||||||
a.srce
|
|
||||||
,a.target
|
|
||||||
,a.seq
|
|
||||||
,a.map_intention
|
|
||||||
,a.map_val
|
|
||||||
,a."count"
|
|
||||||
,a.retain_val
|
|
||||||
,v.map mapped_val
|
|
||||||
FROM
|
|
||||||
agg_to_ret a
|
|
||||||
LEFT OUTER JOIN tps.map_rv v ON
|
|
||||||
v.srce = a.srce AND
|
|
||||||
v.target = a.target AND
|
|
||||||
v.retval = a.map_val
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
l.srce
|
|
||||||
,l.target
|
|
||||||
,l.map_val
|
|
||||||
,l."count"
|
|
||||||
FROM
|
|
||||||
link_map l
|
|
||||||
WHERE
|
|
||||||
l.mapped_val IS NULL
|
|
||||||
ORDER BY
|
|
||||||
l.srce
|
|
||||||
,l.target
|
|
||||||
,l."count" desc;
|
|
||||||
END;
|
|
||||||
$f$
|
|
@ -1,263 +0,0 @@
|
|||||||
DROP FUNCTION IF EXISTS tps.report_unmapped_recs;
|
|
||||||
CREATE FUNCTION tps.report_unmapped_recs(_srce text) RETURNS TABLE
|
|
||||||
(
|
|
||||||
source text,
|
|
||||||
map text,
|
|
||||||
ret_val jsonb,
|
|
||||||
"count" bigint,
|
|
||||||
recs jsonb
|
|
||||||
|
|
||||||
)
|
|
||||||
LANGUAGE plpgsql
|
|
||||||
AS
|
|
||||||
$f$
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
/*
|
|
||||||
first get distinct target json values
|
|
||||||
then apply regex
|
|
||||||
*/
|
|
||||||
|
|
||||||
RETURN QUERY
|
|
||||||
WITH
|
|
||||||
|
|
||||||
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
rx AS (
|
|
||||||
SELECT
|
|
||||||
t.srce,
|
|
||||||
t.id,
|
|
||||||
t.rec,
|
|
||||||
m.target,
|
|
||||||
m.seq,
|
|
||||||
regex->>'function' regex_function,
|
|
||||||
e.v ->> 'field' result_key_name,
|
|
||||||
e.v ->> 'key' target_json_path,
|
|
||||||
e.v ->> 'flag' regex_options_flag,
|
|
||||||
e.v->>'map' map_intention,
|
|
||||||
e.v->>'retain' retain_result,
|
|
||||||
e.v->>'regex' regex_expression,
|
|
||||||
e.rn target_item_number,
|
|
||||||
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
||||||
mt.mt rx_match,
|
|
||||||
rp.rp rx_replace,
|
|
||||||
--------------------------json key name assigned to return value-----------------------------------------------------------------------
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
null
|
|
||||||
END map_key,
|
|
||||||
--------------------------json value resulting from regular expression-----------------------------------------------------------------
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(mt.mt[1])
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rp.rp)
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END map_val,
|
|
||||||
--------------------------flag for if retruned regex result is stored as a new part of the final json output---------------------------
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_key,
|
|
||||||
--------------------------push regex result into json object---------------------------------------------------------------------------
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(trim(mt.mt[1]))
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rtrim(rp.rp))
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
|
||||||
tps.map_rm m
|
|
||||||
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
|
||||||
JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
|
|
||||||
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
|
||||||
JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
||||||
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.srce = m.srce AND
|
|
||||||
t.rec @> w.v
|
|
||||||
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
|
||||||
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
||||||
m.regex->>'function' = 'extract'
|
|
||||||
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
|
||||||
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
||||||
m.regex->>'function' = 'replace'
|
|
||||||
WHERE
|
|
||||||
--t.allj IS NULL
|
|
||||||
t.srce = _srce AND
|
|
||||||
e.v @> '{"map":"y"}'::jsonb
|
|
||||||
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
||||||
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
||||||
/*
|
|
||||||
ORDER BY
|
|
||||||
t.id DESC,
|
|
||||||
m.target,
|
|
||||||
e.rn,
|
|
||||||
COALESCE(mt.rn,rp.rn,1)
|
|
||||||
*/
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM rx LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target_items AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,rec
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,CASE WHEN map_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
map_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(map_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END map_val
|
|
||||||
,CASE WHEN retain_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
retain_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
rx
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,rec
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,map_key
|
|
||||||
,retain_key
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,rec
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
||||||
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
||||||
FROM
|
|
||||||
agg_to_target_items
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,rec
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_ret AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,map_val
|
|
||||||
,retain_val
|
|
||||||
,count(*) "count"
|
|
||||||
,jsonb_agg(rec) rec
|
|
||||||
FROM
|
|
||||||
agg_to_target
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,map_val
|
|
||||||
,retain_val
|
|
||||||
)
|
|
||||||
|
|
||||||
, link_map AS (
|
|
||||||
SELECT
|
|
||||||
a.srce
|
|
||||||
,a.target
|
|
||||||
,a.seq
|
|
||||||
,a.map_intention
|
|
||||||
,a.map_val
|
|
||||||
,a."count"
|
|
||||||
,a.rec
|
|
||||||
,a.retain_val
|
|
||||||
,v.map mapped_val
|
|
||||||
FROM
|
|
||||||
agg_to_ret a
|
|
||||||
LEFT OUTER JOIN tps.map_rv v ON
|
|
||||||
v.srce = a.srce AND
|
|
||||||
v.target = a.target AND
|
|
||||||
v.retval = a.map_val
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
l.srce
|
|
||||||
,l.target
|
|
||||||
,l.map_val
|
|
||||||
,l."count"
|
|
||||||
,l.rec
|
|
||||||
FROM
|
|
||||||
link_map l
|
|
||||||
WHERE
|
|
||||||
l.mapped_val IS NULL
|
|
||||||
ORDER BY
|
|
||||||
l.srce
|
|
||||||
,l.target
|
|
||||||
,l."count" desc;
|
|
||||||
END;
|
|
||||||
$f$
|
|
@ -1,257 +0,0 @@
|
|||||||
\timing
|
|
||||||
DROP FUNCTION tps.srce_import(_path text, _srce text);
|
|
||||||
CREATE OR REPLACE FUNCTION tps.srce_import(_path text, _srce text) RETURNS jsonb
|
|
||||||
|
|
||||||
/*--------------------------------------------------------
|
|
||||||
0. load target import to temp table
|
|
||||||
1. create pending list
|
|
||||||
2. get unqiue pending keys
|
|
||||||
3. see which keys not already in tps.trans
|
|
||||||
4. insert pending records associated with keys that are not already in trans
|
|
||||||
5. insert summary to log table
|
|
||||||
*/---------------------------------------------------------
|
|
||||||
|
|
||||||
--to-do
|
|
||||||
--return infomation to a client via json or composite type
|
|
||||||
|
|
||||||
|
|
||||||
AS $f$
|
|
||||||
DECLARE _t text;
|
|
||||||
DECLARE _c text;
|
|
||||||
DECLARE _log_info jsonb;
|
|
||||||
DECLARE _log_id text;
|
|
||||||
DECLARE _cnt numeric;
|
|
||||||
DECLARE _message jsonb;
|
|
||||||
_MESSAGE_TEXT text;
|
|
||||||
_PG_EXCEPTION_DETAIL text;
|
|
||||||
_PG_EXCEPTION_HINT text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
--_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv';
|
|
||||||
--_srce := 'DCARD';
|
|
||||||
|
|
||||||
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
COUNT(*)
|
|
||||||
INTO
|
|
||||||
_cnt
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
WHERE
|
|
||||||
srce = _srce;
|
|
||||||
|
|
||||||
IF _cnt = 0 THEN
|
|
||||||
_message:=
|
|
||||||
format(
|
|
||||||
$$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"source %L does not exists"
|
|
||||||
}
|
|
||||||
$$,
|
|
||||||
_srce
|
|
||||||
)::jsonb;
|
|
||||||
RETURN _message;
|
|
||||||
END IF;
|
|
||||||
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
string_agg(quote_ident(prs.key)||' '||prs.type,','),
|
|
||||||
string_agg(quote_ident(prs.key),',')
|
|
||||||
INTO
|
|
||||||
_t,
|
|
||||||
_c
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
--unwrap the schema definition array
|
|
||||||
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
|
||||||
WHERE
|
|
||||||
srce = _srce
|
|
||||||
GROUP BY
|
|
||||||
srce;
|
|
||||||
|
|
||||||
----------------------------------------------------add create table verbage in front of column list--------------------------------------------------------
|
|
||||||
|
|
||||||
_t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t);
|
|
||||||
--RAISE NOTICE '%', _t;
|
|
||||||
--RAISE NOTICE '%', _c;
|
|
||||||
|
|
||||||
DROP TABLE IF EXISTS csv_i;
|
|
||||||
|
|
||||||
EXECUTE _t;
|
|
||||||
|
|
||||||
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
--the column list needs to be dynamic forcing this whole line to be dynamic
|
|
||||||
_t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
|
|
||||||
|
|
||||||
--RAISE NOTICE '%', _t;
|
|
||||||
|
|
||||||
EXECUTE _t;
|
|
||||||
|
|
||||||
WITH
|
|
||||||
|
|
||||||
-------------extract the limiter fields to one row per source----------------------------------
|
|
||||||
|
|
||||||
ext AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,defn->'unique_constraint'->>'fields'
|
|
||||||
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
WHERE
|
|
||||||
srce = _srce
|
|
||||||
--add where clause for targeted source
|
|
||||||
)
|
|
||||||
|
|
||||||
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
|
|
||||||
|
|
||||||
,pending_list AS (
|
|
||||||
SELECT
|
|
||||||
tps.jsonb_extract(
|
|
||||||
row_to_json(i)::jsonb
|
|
||||||
,ext.text_array
|
|
||||||
) json_key,
|
|
||||||
row_to_json(i)::JSONB rec,
|
|
||||||
srce,
|
|
||||||
--ae.rn,
|
|
||||||
id
|
|
||||||
FROM
|
|
||||||
csv_i i
|
|
||||||
INNER JOIN ext ON
|
|
||||||
ext.srce = _srce
|
|
||||||
ORDER BY
|
|
||||||
id ASC
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, pending_keys AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
pending_list
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, matched_keys AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
k.json_key
|
|
||||||
FROM
|
|
||||||
pending_keys k
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.ic = k.json_key
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, unmatched_keys AS (
|
|
||||||
SELECT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
pending_keys
|
|
||||||
|
|
||||||
EXCEPT
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
matched_keys
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
|
|
||||||
--need to look into mapping the transactions prior to loading
|
|
||||||
|
|
||||||
, inserted AS (
|
|
||||||
INSERT INTO
|
|
||||||
tps.trans (srce, rec, ic)
|
|
||||||
SELECT
|
|
||||||
pl.srce
|
|
||||||
,pl.rec
|
|
||||||
,pl.json_key
|
|
||||||
FROM
|
|
||||||
pending_list pl
|
|
||||||
INNER JOIN unmatched_keys u ON
|
|
||||||
u.json_key = pl.json_key
|
|
||||||
ORDER BY
|
|
||||||
pl.id ASC
|
|
||||||
----this conflict is only if an exact duplicate rec json happens, which will be rejected
|
|
||||||
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
|
|
||||||
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, logged AS (
|
|
||||||
INSERT INTO
|
|
||||||
tps.trans_log (info)
|
|
||||||
SELECT
|
|
||||||
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
|
|
||||||
||JSONB_BUILD_OBJECT('srce',_srce)
|
|
||||||
||JSONB_BUILD_OBJECT('path',_path)
|
|
||||||
||JSONB_BUILD_OBJECT('not_inserted',
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
jsonb_agg(json_key)
|
|
||||||
FROM
|
|
||||||
matched_keys
|
|
||||||
)
|
|
||||||
)
|
|
||||||
||JSONB_BUILD_OBJECT('inserted',
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
jsonb_agg(json_key)
|
|
||||||
FROM
|
|
||||||
unmatched_keys
|
|
||||||
)
|
|
||||||
)
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
id
|
|
||||||
,info
|
|
||||||
INTO
|
|
||||||
_log_id
|
|
||||||
,_log_info
|
|
||||||
FROM
|
|
||||||
logged;
|
|
||||||
|
|
||||||
--RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
|
|
||||||
|
|
||||||
_message:=
|
|
||||||
(
|
|
||||||
format(
|
|
||||||
$$
|
|
||||||
{
|
|
||||||
"status":"complete",
|
|
||||||
"message":"import of %L for source %L complete"
|
|
||||||
}
|
|
||||||
$$, _path, _srce)::jsonb
|
|
||||||
)||jsonb_build_object('details',_log_info);
|
|
||||||
|
|
||||||
RETURN _message;
|
|
||||||
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
GET STACKED DIAGNOSTICS
|
|
||||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
||||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
||||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
||||||
_message:=
|
|
||||||
($$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"error importing data"
|
|
||||||
}
|
|
||||||
$$::jsonb)
|
|
||||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
||||||
return _message;
|
|
||||||
END;
|
|
||||||
$f$
|
|
||||||
LANGUAGE plpgsql
|
|
||||||
|
|
@ -1,243 +0,0 @@
|
|||||||
|
|
||||||
DO $F$
|
|
||||||
DECLARE _t text;
|
|
||||||
DECLARE _c text;
|
|
||||||
DECLARE _log_info jsonb;
|
|
||||||
DECLARE _log_id text;
|
|
||||||
DECLARE _cnt numeric;
|
|
||||||
DECLARE _message jsonb;
|
|
||||||
_MESSAGE_TEXT text;
|
|
||||||
_PG_EXCEPTION_DETAIL text;
|
|
||||||
_PG_EXCEPTION_HINT text;
|
|
||||||
_path text;
|
|
||||||
_srce text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
_path := 'C:\users\fleet\downloads\testj.csv';
|
|
||||||
_srce := 'DMAPI';
|
|
||||||
|
|
||||||
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
COUNT(*)
|
|
||||||
INTO
|
|
||||||
_cnt
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
WHERE
|
|
||||||
srce = _srce;
|
|
||||||
|
|
||||||
IF _cnt = 0 THEN
|
|
||||||
_message:=
|
|
||||||
format(
|
|
||||||
$$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"source %L does not exists"
|
|
||||||
}
|
|
||||||
$$,
|
|
||||||
_srce
|
|
||||||
)::jsonb;
|
|
||||||
END IF;
|
|
||||||
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
string_agg(quote_ident(prs.key)||' '||prs.type,','),
|
|
||||||
string_agg(quote_ident(prs.key),',')
|
|
||||||
INTO
|
|
||||||
_t,
|
|
||||||
_c
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
--unwrap the schema definition array
|
|
||||||
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
|
||||||
WHERE
|
|
||||||
srce = _srce
|
|
||||||
GROUP BY
|
|
||||||
srce;
|
|
||||||
|
|
||||||
----------------------------------------------------add create table verbage in front of column list--------------------------------------------------------
|
|
||||||
|
|
||||||
_t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t);
|
|
||||||
--RAISE NOTICE '%', _t;
|
|
||||||
--RAISE NOTICE '%', _c;
|
|
||||||
|
|
||||||
DROP TABLE IF EXISTS csv_i;
|
|
||||||
|
|
||||||
EXECUTE _t;
|
|
||||||
|
|
||||||
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
--the column list needs to be dynamic forcing this whole line to be dynamic
|
|
||||||
_t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
|
|
||||||
|
|
||||||
--RAISE NOTICE '%', _t;
|
|
||||||
|
|
||||||
EXECUTE _t;
|
|
||||||
|
|
||||||
--drop table if exists tps.x;
|
|
||||||
--create table tps.x as
|
|
||||||
--(
|
|
||||||
WITH
|
|
||||||
|
|
||||||
-------------extract the limiter fields to one row per source----------------------------------
|
|
||||||
|
|
||||||
ext AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,defn->'unique_constraint'->>'fields'
|
|
||||||
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
WHERE
|
|
||||||
srce = _srce
|
|
||||||
--add where clause for targeted source
|
|
||||||
)
|
|
||||||
|
|
||||||
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
|
|
||||||
|
|
||||||
,pending_list AS (
|
|
||||||
SELECT
|
|
||||||
tps.jsonb_extract(
|
|
||||||
row_to_json(i)::jsonb
|
|
||||||
,ext.text_array
|
|
||||||
) json_key,
|
|
||||||
row_to_json(i)::JSONB rec,
|
|
||||||
srce,
|
|
||||||
--ae.rn,
|
|
||||||
id
|
|
||||||
FROM
|
|
||||||
csv_i i
|
|
||||||
INNER JOIN ext ON
|
|
||||||
ext.srce = _srce
|
|
||||||
ORDER BY
|
|
||||||
id ASC
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, pending_keys AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
pending_list
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, matched_keys AS (
|
|
||||||
SELECT DISTINCT
|
|
||||||
k.json_key
|
|
||||||
FROM
|
|
||||||
pending_keys k
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.ic = k.json_key
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, unmatched_keys AS (
|
|
||||||
SELECT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
pending_keys
|
|
||||||
|
|
||||||
EXCEPT
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
json_key
|
|
||||||
FROM
|
|
||||||
matched_keys
|
|
||||||
)
|
|
||||||
|
|
||||||
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
|
|
||||||
--need to look into mapping the transactions prior to loading
|
|
||||||
|
|
||||||
, inserted AS (
|
|
||||||
INSERT INTO
|
|
||||||
tps.trans (srce, rec, ic)
|
|
||||||
SELECT
|
|
||||||
pl.srce
|
|
||||||
,pl.rec
|
|
||||||
,pl.json_key
|
|
||||||
FROM
|
|
||||||
pending_list pl
|
|
||||||
INNER JOIN unmatched_keys u ON
|
|
||||||
u.json_key = pl.json_key
|
|
||||||
ORDER BY
|
|
||||||
pl.id ASC
|
|
||||||
----this conflict is only if an exact duplicate rec json happens, which will be rejected
|
|
||||||
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
|
|
||||||
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
, logged AS (
|
|
||||||
INSERT INTO
|
|
||||||
tps.trans_log (info)
|
|
||||||
SELECT
|
|
||||||
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
|
|
||||||
||JSONB_BUILD_OBJECT('srce',_srce)
|
|
||||||
||JSONB_BUILD_OBJECT('path',_path)
|
|
||||||
||JSONB_BUILD_OBJECT('not_inserted',
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
jsonb_agg(json_key)
|
|
||||||
FROM
|
|
||||||
matched_keys
|
|
||||||
)
|
|
||||||
)
|
|
||||||
||JSONB_BUILD_OBJECT('inserted',
|
|
||||||
(
|
|
||||||
SELECT
|
|
||||||
jsonb_agg(json_key)
|
|
||||||
FROM
|
|
||||||
unmatched_keys
|
|
||||||
)
|
|
||||||
)
|
|
||||||
RETURNING *
|
|
||||||
)
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
id
|
|
||||||
,info
|
|
||||||
INTO
|
|
||||||
_log_id
|
|
||||||
,_log_info
|
|
||||||
FROM
|
|
||||||
logged;
|
|
||||||
|
|
||||||
RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
|
|
||||||
|
|
||||||
/*
|
|
||||||
_message:=
|
|
||||||
(
|
|
||||||
format(
|
|
||||||
$$
|
|
||||||
{
|
|
||||||
"status":"complete",
|
|
||||||
"message":"import of %L for source %L complete"
|
|
||||||
}
|
|
||||||
$$, _path, _srce)::jsonb
|
|
||||||
)||jsonb_build_object('details',_log_info);
|
|
||||||
|
|
||||||
|
|
||||||
RAISE NOTICE '%s',_message;
|
|
||||||
*/
|
|
||||||
--select * from pending_keys
|
|
||||||
--) with data;
|
|
||||||
end;
|
|
||||||
$F$;
|
|
||||||
/*
|
|
||||||
SELECT
|
|
||||||
JSONB_PRETTY(k.json_key) orig,
|
|
||||||
jsonb_pretty(jsonb_build_object('input_constraint',k.json_key)) uq,
|
|
||||||
T.REC
|
|
||||||
FROM
|
|
||||||
tps.x k
|
|
||||||
left outer JOIN tps.trans t ON
|
|
||||||
t.rec @> k.json_key;
|
|
||||||
*/
|
|
@ -1,51 +0,0 @@
|
|||||||
CREATE OR REPLACE FUNCTION tps.srce_map_def_set(_srce text, _map text, _defn jsonb, _seq int) RETURNS jsonb
|
|
||||||
AS
|
|
||||||
$f$
|
|
||||||
|
|
||||||
DECLARE
|
|
||||||
_message jsonb;
|
|
||||||
_MESSAGE_TEXT text;
|
|
||||||
_PG_EXCEPTION_DETAIL text;
|
|
||||||
_PG_EXCEPTION_HINT text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
INSERT INTO
|
|
||||||
tps.map_rm
|
|
||||||
SELECT
|
|
||||||
_srce
|
|
||||||
,_map
|
|
||||||
,_defn
|
|
||||||
,_seq
|
|
||||||
ON CONFLICT ON CONSTRAINT map_rm_pk DO UPDATE SET
|
|
||||||
srce = _srce
|
|
||||||
,target = _map
|
|
||||||
,regex = _defn
|
|
||||||
,seq = _seq;
|
|
||||||
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
|
|
||||||
GET STACKED DIAGNOSTICS
|
|
||||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
||||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
||||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
||||||
_message:=
|
|
||||||
($$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"error setting definition"
|
|
||||||
}
|
|
||||||
$$::jsonb)
|
|
||||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
||||||
return _message;
|
|
||||||
END;
|
|
||||||
|
|
||||||
_message:= jsonb_build_object('status','complete','message','definition has been set');
|
|
||||||
return _message;
|
|
||||||
|
|
||||||
END;
|
|
||||||
$f$
|
|
||||||
language plpgsql
|
|
@ -1,255 +0,0 @@
|
|||||||
CREATE OR REPLACE FUNCTION tps.srce_map_overwrite(_srce text) RETURNS jsonb
|
|
||||||
AS
|
|
||||||
$f$
|
|
||||||
DECLARE
|
|
||||||
_message jsonb;
|
|
||||||
_MESSAGE_TEXT text;
|
|
||||||
_PG_EXCEPTION_DETAIL text;
|
|
||||||
_PG_EXCEPTION_HINT text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
WITH
|
|
||||||
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
|
|
||||||
|
|
||||||
rx AS (
|
|
||||||
SELECT
|
|
||||||
t.srce,
|
|
||||||
t.id,
|
|
||||||
t.rec,
|
|
||||||
m.target,
|
|
||||||
m.seq,
|
|
||||||
regex->>'function' regex_function,
|
|
||||||
e.v ->> 'field' result_key_name,
|
|
||||||
e.v ->> 'key' target_json_path,
|
|
||||||
e.v ->> 'flag' regex_options_flag,
|
|
||||||
e.v->>'map' map_intention,
|
|
||||||
e.v->>'retain' retain_result,
|
|
||||||
e.v->>'regex' regex_expression,
|
|
||||||
e.rn target_item_number,
|
|
||||||
COALESCE(mt.rn,rp.rn,1) result_number,
|
|
||||||
mt.mt rx_match,
|
|
||||||
rp.rp rx_replace,
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
null
|
|
||||||
END map_key,
|
|
||||||
CASE e.v->>'map'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(mt.mt[1])
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rp.rp)
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END map_val,
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
e.v->>'field'
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_key,
|
|
||||||
CASE e.v->>'retain'
|
|
||||||
WHEN 'y' THEN
|
|
||||||
CASE regex->>'function'
|
|
||||||
WHEN 'extract' THEN
|
|
||||||
CASE WHEN array_upper(mt.mt,1)=1
|
|
||||||
THEN to_json(trim(mt.mt[1]))
|
|
||||||
ELSE array_to_json(mt.mt)
|
|
||||||
END::jsonb
|
|
||||||
WHEN 'replace' THEN
|
|
||||||
to_jsonb(rtrim(rp.rp))
|
|
||||||
ELSE
|
|
||||||
'{}'::jsonb
|
|
||||||
END
|
|
||||||
ELSE
|
|
||||||
NULL
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
tps.map_rm m
|
|
||||||
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
|
|
||||||
INNER JOIN tps.trans t ON
|
|
||||||
t.srce = m.srce AND
|
|
||||||
t.rec @> w.v
|
|
||||||
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
|
|
||||||
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
|
||||||
m.regex->>'function' = 'extract'
|
|
||||||
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
|
||||||
m.regex->>'function' = 'replace'
|
|
||||||
WHERE
|
|
||||||
--t.allj IS NULL
|
|
||||||
t.srce = _srce
|
|
||||||
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
|
||||||
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
|
||||||
ORDER BY
|
|
||||||
t.id DESC,
|
|
||||||
m.target,
|
|
||||||
e.rn,
|
|
||||||
COALESCE(mt.rn,rp.rn,1)
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT count(*) FROM rx LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target_items AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,CASE WHEN map_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
map_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(map_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(map_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END map_val
|
|
||||||
,CASE WHEN retain_key IS NULL
|
|
||||||
THEN
|
|
||||||
NULL
|
|
||||||
ELSE
|
|
||||||
jsonb_build_object(
|
|
||||||
retain_key,
|
|
||||||
CASE WHEN max(result_number) = 1
|
|
||||||
THEN
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
|
||||||
ELSE
|
|
||||||
jsonb_agg(retain_val ORDER BY result_number)
|
|
||||||
END
|
|
||||||
)
|
|
||||||
END retain_val
|
|
||||||
FROM
|
|
||||||
rx
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,regex_function
|
|
||||||
,target_item_number
|
|
||||||
,result_key_name
|
|
||||||
,target_json_path
|
|
||||||
,map_key
|
|
||||||
,retain_key
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM agg_to_target_items LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
, agg_to_target AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
|
||||||
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
|
||||||
FROM
|
|
||||||
agg_to_target_items
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,target
|
|
||||||
,seq
|
|
||||||
,map_intention
|
|
||||||
ORDER BY
|
|
||||||
id
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
--SELECT * FROM agg_to_target
|
|
||||||
|
|
||||||
|
|
||||||
, link_map AS (
|
|
||||||
SELECT
|
|
||||||
a.srce
|
|
||||||
,a.id
|
|
||||||
,a.target
|
|
||||||
,a.seq
|
|
||||||
,a.map_intention
|
|
||||||
,a.map_val
|
|
||||||
,a.retain_val retain_value
|
|
||||||
,v.map
|
|
||||||
FROM
|
|
||||||
agg_to_target a
|
|
||||||
LEFT OUTER JOIN tps.map_rv v ON
|
|
||||||
v.srce = a.srce AND
|
|
||||||
v.target = a.target AND
|
|
||||||
v.retval = a.map_val
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT * FROM link_map
|
|
||||||
|
|
||||||
, agg_to_id AS (
|
|
||||||
SELECT
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb) ORDER BY seq DESC) retain_val
|
|
||||||
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
|
|
||||||
FROM
|
|
||||||
link_map
|
|
||||||
GROUP BY
|
|
||||||
srce
|
|
||||||
,id
|
|
||||||
)
|
|
||||||
|
|
||||||
--SELECT agg_to_id.srce, agg_to_id.id, jsonb_pretty(agg_to_id.retain_val) , jsonb_pretty(agg_to_id.map) FROM agg_to_id ORDER BY id desc LIMIT 100
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
UPDATE
|
|
||||||
tps.trans t
|
|
||||||
SET
|
|
||||||
map = o.map,
|
|
||||||
parse = o.retain_val,
|
|
||||||
allj = t.rec||o.map||o.retain_val
|
|
||||||
FROM
|
|
||||||
agg_to_id o
|
|
||||||
WHERE
|
|
||||||
o.id = t.id;
|
|
||||||
|
|
||||||
_message:= jsonb_build_object('status','complete');
|
|
||||||
RETURN _message;
|
|
||||||
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
|
|
||||||
GET STACKED DIAGNOSTICS
|
|
||||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
||||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
||||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
||||||
_message:=
|
|
||||||
($$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"error setting map value"
|
|
||||||
}
|
|
||||||
$$::jsonb)
|
|
||||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
||||||
|
|
||||||
RETURN _message;
|
|
||||||
END;
|
|
||||||
$f$
|
|
||||||
language plpgsql
|
|
@ -1,49 +0,0 @@
|
|||||||
CREATE OR REPLACE FUNCTION tps.srce_map_val_set(_srce text, _target text, _ret jsonb, _map jsonb) RETURNS jsonb
|
|
||||||
AS
|
|
||||||
$f$
|
|
||||||
|
|
||||||
DECLARE
|
|
||||||
_message jsonb;
|
|
||||||
_MESSAGE_TEXT text;
|
|
||||||
_PG_EXCEPTION_DETAIL text;
|
|
||||||
_PG_EXCEPTION_HINT text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
INSERT INTO
|
|
||||||
tps.map_rv
|
|
||||||
SELECT
|
|
||||||
_srce
|
|
||||||
,_target
|
|
||||||
,_ret
|
|
||||||
,_map
|
|
||||||
ON CONFLICT ON CONSTRAINT map_rv_pk DO UPDATE SET
|
|
||||||
srce = _srce
|
|
||||||
,target = _target
|
|
||||||
,retval = _ret
|
|
||||||
,map = _map;
|
|
||||||
|
|
||||||
_message:= jsonb_build_object('status','complete');
|
|
||||||
RETURN _message;
|
|
||||||
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
|
|
||||||
GET STACKED DIAGNOSTICS
|
|
||||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
||||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
||||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
||||||
_message:=
|
|
||||||
($$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"error setting map value"
|
|
||||||
}
|
|
||||||
$$::jsonb)
|
|
||||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
||||||
|
|
||||||
RETURN _message;
|
|
||||||
|
|
||||||
END
|
|
||||||
$f$
|
|
||||||
language plpgsql
|
|
@ -1,60 +0,0 @@
|
|||||||
DROP FUNCTION tps.srce_map_val_set_multi;
|
|
||||||
CREATE OR REPLACE FUNCTION tps.srce_map_val_set_multi(_maps jsonb) RETURNS JSONB
|
|
||||||
LANGUAGE plpgsql
|
|
||||||
AS $f$
|
|
||||||
|
|
||||||
DECLARE
|
|
||||||
_message jsonb;
|
|
||||||
_MESSAGE_TEXT text;
|
|
||||||
_PG_EXCEPTION_DETAIL text;
|
|
||||||
_PG_EXCEPTION_HINT text;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
|
|
||||||
WITH
|
|
||||||
-----------expand the json into a table------------------------------------------------------------------------------
|
|
||||||
t AS (
|
|
||||||
SELECT
|
|
||||||
jtr.*
|
|
||||||
FROM
|
|
||||||
jsonb_array_elements(_maps) ae(v)
|
|
||||||
JOIN LATERAL jsonb_to_record(ae.v) AS jtr(source text, map text, ret_val jsonb, mapped jsonb) ON TRUE
|
|
||||||
)
|
|
||||||
-----------do merge---------------------------------------------------------------------------------------------------
|
|
||||||
INSERT INTO
|
|
||||||
tps.map_rv
|
|
||||||
SELECT
|
|
||||||
t."source"
|
|
||||||
,t."map"
|
|
||||||
,t.ret_val
|
|
||||||
,t.mapped
|
|
||||||
FROM
|
|
||||||
t
|
|
||||||
ON CONFLICT ON CONSTRAINT map_rv_pk DO UPDATE SET
|
|
||||||
map = excluded.map;
|
|
||||||
|
|
||||||
-------return message--------------------------------------------------------------------------------------------------
|
|
||||||
_message:= jsonb_build_object('status','complete');
|
|
||||||
RETURN _message;
|
|
||||||
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
|
|
||||||
GET STACKED DIAGNOSTICS
|
|
||||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
|
||||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
|
||||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
|
||||||
_message:=
|
|
||||||
($$
|
|
||||||
{
|
|
||||||
"status":"fail",
|
|
||||||
"message":"error setting map value"
|
|
||||||
}
|
|
||||||
$$::jsonb)
|
|
||||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
|
||||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
|
||||||
|
|
||||||
RETURN _message;
|
|
||||||
|
|
||||||
END;
|
|
||||||
$f$
|
|
@ -1,107 +0,0 @@
|
|||||||
|
|
||||||
CREATE OR REPLACE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
|
|
||||||
AS $f$
|
|
||||||
|
|
||||||
DECLARE
|
|
||||||
_cnt int;
|
|
||||||
_conflict BOOLEAN;
|
|
||||||
_message jsonb;
|
|
||||||
_sql text;
|
|
||||||
_cur_sch jsonb;
|
|
||||||
|
|
||||||
BEGIN
|
|
||||||
|
|
||||||
/*
|
|
||||||
1. determine if insert or update
|
|
||||||
2. if update, determine if conflicts exists
|
|
||||||
3. do merge
|
|
||||||
*/
|
|
||||||
|
|
||||||
-------extract current source schema for compare--------------------------
|
|
||||||
SELECT
|
|
||||||
defn->'schema'
|
|
||||||
INTO
|
|
||||||
_cur_sch
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
WHERE
|
|
||||||
srce = _defn->>'name';
|
|
||||||
|
|
||||||
-------check for transctions already existing under this source-----------
|
|
||||||
SELECT
|
|
||||||
COUNT(*)
|
|
||||||
INTO
|
|
||||||
_cnt
|
|
||||||
FROM
|
|
||||||
tps.trans
|
|
||||||
WHERE
|
|
||||||
srce = _defn->>'name';
|
|
||||||
|
|
||||||
--if there are transaction already and the schema is different stop--------
|
|
||||||
IF _cnt > 0 THEN
|
|
||||||
IF _cur_sch <> _defn->'schema' THEN
|
|
||||||
_conflict = TRUE;
|
|
||||||
--get out of the function somehow
|
|
||||||
_message =
|
|
||||||
$$
|
|
||||||
{
|
|
||||||
"message":"transactions already exist under source profile and there is a pending schema change"
|
|
||||||
,"status":"error"
|
|
||||||
}
|
|
||||||
$$::jsonb;
|
|
||||||
return _message;
|
|
||||||
END IF;
|
|
||||||
END IF;
|
|
||||||
|
|
||||||
/*-------------------------------------------------------
|
|
||||||
do schema validation fo _defn object?
|
|
||||||
---------------------------------------------------------*/
|
|
||||||
|
|
||||||
-------------------insert definition----------------------------------------
|
|
||||||
INSERT INTO
|
|
||||||
tps.srce
|
|
||||||
SELECT
|
|
||||||
_defn->>'name', _defn
|
|
||||||
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
|
|
||||||
SET
|
|
||||||
defn = _defn;
|
|
||||||
|
|
||||||
------------------drop existing type-----------------------------------------
|
|
||||||
|
|
||||||
EXECUTE format('DROP TYPE IF EXISTS tps.%I',_defn->>'name');
|
|
||||||
|
|
||||||
------------------create new type--------------------------------------------
|
|
||||||
|
|
||||||
SELECT
|
|
||||||
string_agg(quote_ident(prs.key)||' '||prs.type,',')
|
|
||||||
INTO
|
|
||||||
_sql
|
|
||||||
FROM
|
|
||||||
tps.srce
|
|
||||||
--unwrap the schema definition array
|
|
||||||
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
|
||||||
WHERE
|
|
||||||
srce = _defn->>'name'
|
|
||||||
GROUP BY
|
|
||||||
srce;
|
|
||||||
|
|
||||||
RAISE NOTICE 'CREATE TYPE tps.% AS (%)',_defn->>'name',_sql;
|
|
||||||
|
|
||||||
EXECUTE format('CREATE TYPE tps.%I AS (%s)',_defn->>'name',_sql);
|
|
||||||
|
|
||||||
EXECUTE format('COMMENT ON TYPE tps.%I IS %L',_defn->>'name',(_defn->>'description'));
|
|
||||||
|
|
||||||
----------------set message-----------------------------------------------------
|
|
||||||
|
|
||||||
_message =
|
|
||||||
$$
|
|
||||||
{
|
|
||||||
"message":"definition set"
|
|
||||||
,"status":"success"
|
|
||||||
}
|
|
||||||
$$::jsonb;
|
|
||||||
return _message;
|
|
||||||
|
|
||||||
END;
|
|
||||||
$f$
|
|
||||||
LANGUAGE plpgsql
|
|
Loading…
Reference in New Issue
Block a user