Merge branch 'j_constr' into key_log
This commit is contained in:
commit
7a516d7a24
@ -125,7 +125,7 @@ BEGIN
|
||||
FOREACH t SLICE 1 IN ARRAY key_list LOOP
|
||||
--RAISE NOTICE '%', t;
|
||||
--RAISE NOTICE '%', t[1];
|
||||
j := j || jsonb_build_object(t[1],rec#>t);
|
||||
j := j || jsonb_build_object(t::text,rec#>t);
|
||||
END LOOP;
|
||||
RETURN j;
|
||||
END;
|
||||
@ -519,7 +519,7 @@ BEGIN
|
||||
FROM
|
||||
pending_keys k
|
||||
INNER JOIN tps.trans t ON
|
||||
t.rec @> k.json_key
|
||||
t.ic = k.json_key
|
||||
)
|
||||
|
||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
||||
@ -543,10 +543,11 @@ BEGIN
|
||||
|
||||
, inserted AS (
|
||||
INSERT INTO
|
||||
tps.trans (srce, rec)
|
||||
tps.trans (srce, rec, ic)
|
||||
SELECT
|
||||
pl.srce
|
||||
,pl.rec
|
||||
,pl.json_key
|
||||
FROM
|
||||
pending_list pl
|
||||
INNER JOIN unmatched_keys u ON
|
||||
@ -1488,7 +1489,8 @@ CREATE TABLE tps.trans (
|
||||
rec jsonb,
|
||||
parse jsonb,
|
||||
map jsonb,
|
||||
allj jsonb
|
||||
allj jsonb,
|
||||
ic jsonb
|
||||
);
|
||||
|
||||
|
||||
@ -1499,6 +1501,13 @@ CREATE TABLE tps.trans (
|
||||
COMMENT ON TABLE tps.trans IS 'source records';
|
||||
|
||||
|
||||
--
|
||||
-- Name: COLUMN trans.ic; Type: COMMENT; Schema: tps; Owner: -
|
||||
--
|
||||
|
||||
COMMENT ON COLUMN tps.trans.ic IS 'input constraint value';
|
||||
|
||||
|
||||
--
|
||||
-- Name: trans_id_seq; Type: SEQUENCE; Schema: tps; Owner: -
|
||||
--
|
||||
|
263
functions/report_unmapped_recs.sql
Normal file
263
functions/report_unmapped_recs.sql
Normal file
@ -0,0 +1,263 @@
|
||||
DROP FUNCTION IF EXISTS tps.report_unmapped_recs;
|
||||
CREATE FUNCTION tps.report_unmapped_recs(_srce text) RETURNS TABLE
|
||||
(
|
||||
source text,
|
||||
map text,
|
||||
ret_val jsonb,
|
||||
"count" bigint,
|
||||
recs jsonb
|
||||
|
||||
)
|
||||
LANGUAGE plpgsql
|
||||
AS
|
||||
$f$
|
||||
BEGIN
|
||||
|
||||
/*
|
||||
first get distinct target json values
|
||||
then apply regex
|
||||
*/
|
||||
|
||||
RETURN QUERY
|
||||
WITH
|
||||
|
||||
--------------------apply regex operations to transactions---------------------------------------------------------------------------------
|
||||
|
||||
rx AS (
|
||||
SELECT
|
||||
t.srce,
|
||||
t.id,
|
||||
t.rec,
|
||||
m.target,
|
||||
m.seq,
|
||||
regex->>'function' regex_function,
|
||||
e.v ->> 'field' result_key_name,
|
||||
e.v ->> 'key' target_json_path,
|
||||
e.v ->> 'flag' regex_options_flag,
|
||||
e.v->>'map' map_intention,
|
||||
e.v->>'retain' retain_result,
|
||||
e.v->>'regex' regex_expression,
|
||||
e.rn target_item_number,
|
||||
COALESCE(mt.rn,rp.rn,1) result_number,
|
||||
mt.mt rx_match,
|
||||
rp.rp rx_replace,
|
||||
--------------------------json key name assigned to return value-----------------------------------------------------------------------
|
||||
CASE e.v->>'map'
|
||||
WHEN 'y' THEN
|
||||
e.v->>'field'
|
||||
ELSE
|
||||
null
|
||||
END map_key,
|
||||
--------------------------json value resulting from regular expression-----------------------------------------------------------------
|
||||
CASE e.v->>'map'
|
||||
WHEN 'y' THEN
|
||||
CASE regex->>'function'
|
||||
WHEN 'extract' THEN
|
||||
CASE WHEN array_upper(mt.mt,1)=1
|
||||
THEN to_json(mt.mt[1])
|
||||
ELSE array_to_json(mt.mt)
|
||||
END::jsonb
|
||||
WHEN 'replace' THEN
|
||||
to_jsonb(rp.rp)
|
||||
ELSE
|
||||
'{}'::jsonb
|
||||
END
|
||||
ELSE
|
||||
NULL
|
||||
END map_val,
|
||||
--------------------------flag for if retruned regex result is stored as a new part of the final json output---------------------------
|
||||
CASE e.v->>'retain'
|
||||
WHEN 'y' THEN
|
||||
e.v->>'field'
|
||||
ELSE
|
||||
NULL
|
||||
END retain_key,
|
||||
--------------------------push regex result into json object---------------------------------------------------------------------------
|
||||
CASE e.v->>'retain'
|
||||
WHEN 'y' THEN
|
||||
CASE regex->>'function'
|
||||
WHEN 'extract' THEN
|
||||
CASE WHEN array_upper(mt.mt,1)=1
|
||||
THEN to_json(trim(mt.mt[1]))
|
||||
ELSE array_to_json(mt.mt)
|
||||
END::jsonb
|
||||
WHEN 'replace' THEN
|
||||
to_jsonb(rtrim(rp.rp))
|
||||
ELSE
|
||||
'{}'::jsonb
|
||||
END
|
||||
ELSE
|
||||
NULL
|
||||
END retain_val
|
||||
FROM
|
||||
--------------------------start with all regex maps------------------------------------------------------------------------------------
|
||||
tps.map_rm m
|
||||
--------------------------isolate matching basis to limit map to only look at certain json---------------------------------------------
|
||||
JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
|
||||
--------------------------break out array of regluar expressions in the map------------------------------------------------------------
|
||||
JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
|
||||
--------------------------join to main transaction table but only certain key/values are included--------------------------------------
|
||||
INNER JOIN tps.trans t ON
|
||||
t.srce = m.srce AND
|
||||
t.rec @> w.v
|
||||
--------------------------each regex references a path to the target value, extract the target from the reference and do regex---------
|
||||
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
|
||||
m.regex->>'function' = 'extract'
|
||||
--------------------------same as above but for a replacement type function------------------------------------------------------------
|
||||
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
|
||||
m.regex->>'function' = 'replace'
|
||||
WHERE
|
||||
--t.allj IS NULL
|
||||
t.srce = _srce AND
|
||||
e.v @> '{"map":"y"}'::jsonb
|
||||
--rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
|
||||
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
|
||||
/*
|
||||
ORDER BY
|
||||
t.id DESC,
|
||||
m.target,
|
||||
e.rn,
|
||||
COALESCE(mt.rn,rp.rn,1)
|
||||
*/
|
||||
)
|
||||
|
||||
--SELECT * FROM rx LIMIT 100
|
||||
|
||||
|
||||
, agg_to_target_items AS (
|
||||
SELECT
|
||||
srce
|
||||
,id
|
||||
,rec
|
||||
,target
|
||||
,seq
|
||||
,map_intention
|
||||
,regex_function
|
||||
,target_item_number
|
||||
,result_key_name
|
||||
,target_json_path
|
||||
,CASE WHEN map_key IS NULL
|
||||
THEN
|
||||
NULL
|
||||
ELSE
|
||||
jsonb_build_object(
|
||||
map_key,
|
||||
CASE WHEN max(result_number) = 1
|
||||
THEN
|
||||
jsonb_agg(map_val ORDER BY result_number) -> 0
|
||||
ELSE
|
||||
jsonb_agg(map_val ORDER BY result_number)
|
||||
END
|
||||
)
|
||||
END map_val
|
||||
,CASE WHEN retain_key IS NULL
|
||||
THEN
|
||||
NULL
|
||||
ELSE
|
||||
jsonb_build_object(
|
||||
retain_key,
|
||||
CASE WHEN max(result_number) = 1
|
||||
THEN
|
||||
jsonb_agg(retain_val ORDER BY result_number) -> 0
|
||||
ELSE
|
||||
jsonb_agg(retain_val ORDER BY result_number)
|
||||
END
|
||||
)
|
||||
END retain_val
|
||||
FROM
|
||||
rx
|
||||
GROUP BY
|
||||
srce
|
||||
,id
|
||||
,rec
|
||||
,target
|
||||
,seq
|
||||
,map_intention
|
||||
,regex_function
|
||||
,target_item_number
|
||||
,result_key_name
|
||||
,target_json_path
|
||||
,map_key
|
||||
,retain_key
|
||||
)
|
||||
|
||||
--SELECT * FROM agg_to_target_items LIMIT 100
|
||||
|
||||
|
||||
, agg_to_target AS (
|
||||
SELECT
|
||||
srce
|
||||
,id
|
||||
,rec
|
||||
,target
|
||||
,seq
|
||||
,map_intention
|
||||
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
|
||||
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
|
||||
FROM
|
||||
agg_to_target_items
|
||||
GROUP BY
|
||||
srce
|
||||
,id
|
||||
,rec
|
||||
,target
|
||||
,seq
|
||||
,map_intention
|
||||
)
|
||||
|
||||
|
||||
, agg_to_ret AS (
|
||||
SELECT
|
||||
srce
|
||||
,target
|
||||
,seq
|
||||
,map_intention
|
||||
,map_val
|
||||
,retain_val
|
||||
,count(*) "count"
|
||||
,jsonb_agg(rec) rec
|
||||
FROM
|
||||
agg_to_target
|
||||
GROUP BY
|
||||
srce
|
||||
,target
|
||||
,seq
|
||||
,map_intention
|
||||
,map_val
|
||||
,retain_val
|
||||
)
|
||||
|
||||
, link_map AS (
|
||||
SELECT
|
||||
a.srce
|
||||
,a.target
|
||||
,a.seq
|
||||
,a.map_intention
|
||||
,a.map_val
|
||||
,a."count"
|
||||
,a.rec
|
||||
,a.retain_val
|
||||
,v.map mapped_val
|
||||
FROM
|
||||
agg_to_ret a
|
||||
LEFT OUTER JOIN tps.map_rv v ON
|
||||
v.srce = a.srce AND
|
||||
v.target = a.target AND
|
||||
v.retval = a.map_val
|
||||
)
|
||||
SELECT
|
||||
l.srce
|
||||
,l.target
|
||||
,l.map_val
|
||||
,l."count"
|
||||
,l.rec
|
||||
FROM
|
||||
link_map l
|
||||
WHERE
|
||||
l.mapped_val IS NULL
|
||||
ORDER BY
|
||||
l.srce
|
||||
,l.target
|
||||
,l."count" desc;
|
||||
END;
|
||||
$f$
|
@ -144,7 +144,7 @@ BEGIN
|
||||
FROM
|
||||
pending_keys k
|
||||
INNER JOIN tps.trans t ON
|
||||
t.rec @> k.json_key
|
||||
t.ic = k.json_key
|
||||
)
|
||||
|
||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
||||
@ -168,10 +168,11 @@ BEGIN
|
||||
|
||||
, inserted AS (
|
||||
INSERT INTO
|
||||
tps.trans (srce, rec)
|
||||
tps.trans (srce, rec, ic)
|
||||
SELECT
|
||||
pl.srce
|
||||
,pl.rec
|
||||
,pl.json_key
|
||||
FROM
|
||||
pending_list pl
|
||||
INNER JOIN unmatched_keys u ON
|
||||
|
243
functions/srce_import_dev.sql
Normal file
243
functions/srce_import_dev.sql
Normal file
@ -0,0 +1,243 @@
|
||||
|
||||
DO $F$
|
||||
DECLARE _t text;
|
||||
DECLARE _c text;
|
||||
DECLARE _log_info jsonb;
|
||||
DECLARE _log_id text;
|
||||
DECLARE _cnt numeric;
|
||||
DECLARE _message jsonb;
|
||||
_MESSAGE_TEXT text;
|
||||
_PG_EXCEPTION_DETAIL text;
|
||||
_PG_EXCEPTION_HINT text;
|
||||
_path text;
|
||||
_srce text;
|
||||
|
||||
BEGIN
|
||||
|
||||
_path := 'C:\users\fleet\downloads\testj.csv';
|
||||
_srce := 'DMAPI';
|
||||
|
||||
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
|
||||
|
||||
SELECT
|
||||
COUNT(*)
|
||||
INTO
|
||||
_cnt
|
||||
FROM
|
||||
tps.srce
|
||||
WHERE
|
||||
srce = _srce;
|
||||
|
||||
IF _cnt = 0 THEN
|
||||
_message:=
|
||||
format(
|
||||
$$
|
||||
{
|
||||
"status":"fail",
|
||||
"message":"source %L does not exists"
|
||||
}
|
||||
$$,
|
||||
_srce
|
||||
)::jsonb;
|
||||
END IF;
|
||||
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
|
||||
|
||||
SELECT
|
||||
string_agg(quote_ident(prs.key)||' '||prs.type,','),
|
||||
string_agg(quote_ident(prs.key),',')
|
||||
INTO
|
||||
_t,
|
||||
_c
|
||||
FROM
|
||||
tps.srce
|
||||
--unwrap the schema definition array
|
||||
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
||||
WHERE
|
||||
srce = _srce
|
||||
GROUP BY
|
||||
srce;
|
||||
|
||||
----------------------------------------------------add create table verbage in front of column list--------------------------------------------------------
|
||||
|
||||
_t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t);
|
||||
--RAISE NOTICE '%', _t;
|
||||
--RAISE NOTICE '%', _c;
|
||||
|
||||
DROP TABLE IF EXISTS csv_i;
|
||||
|
||||
EXECUTE _t;
|
||||
|
||||
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
|
||||
|
||||
--the column list needs to be dynamic forcing this whole line to be dynamic
|
||||
_t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
|
||||
|
||||
--RAISE NOTICE '%', _t;
|
||||
|
||||
EXECUTE _t;
|
||||
|
||||
--drop table if exists tps.x;
|
||||
--create table tps.x as
|
||||
--(
|
||||
WITH
|
||||
|
||||
-------------extract the limiter fields to one row per source----------------------------------
|
||||
|
||||
ext AS (
|
||||
SELECT
|
||||
srce
|
||||
,defn->'unique_constraint'->>'fields'
|
||||
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
|
||||
FROM
|
||||
tps.srce
|
||||
WHERE
|
||||
srce = _srce
|
||||
--add where clause for targeted source
|
||||
)
|
||||
|
||||
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
|
||||
|
||||
,pending_list AS (
|
||||
SELECT
|
||||
tps.jsonb_extract(
|
||||
row_to_json(i)::jsonb
|
||||
,ext.text_array
|
||||
) json_key,
|
||||
row_to_json(i)::JSONB rec,
|
||||
srce,
|
||||
--ae.rn,
|
||||
id
|
||||
FROM
|
||||
csv_i i
|
||||
INNER JOIN ext ON
|
||||
ext.srce = _srce
|
||||
ORDER BY
|
||||
id ASC
|
||||
)
|
||||
|
||||
|
||||
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
||||
|
||||
, pending_keys AS (
|
||||
SELECT DISTINCT
|
||||
json_key
|
||||
FROM
|
||||
pending_list
|
||||
)
|
||||
|
||||
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
|
||||
|
||||
, matched_keys AS (
|
||||
SELECT DISTINCT
|
||||
k.json_key
|
||||
FROM
|
||||
pending_keys k
|
||||
INNER JOIN tps.trans t ON
|
||||
t.ic = k.json_key
|
||||
)
|
||||
|
||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
||||
|
||||
, unmatched_keys AS (
|
||||
SELECT
|
||||
json_key
|
||||
FROM
|
||||
pending_keys
|
||||
|
||||
EXCEPT
|
||||
|
||||
SELECT
|
||||
json_key
|
||||
FROM
|
||||
matched_keys
|
||||
)
|
||||
|
||||
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
|
||||
--need to look into mapping the transactions prior to loading
|
||||
|
||||
, inserted AS (
|
||||
INSERT INTO
|
||||
tps.trans (srce, rec, ic)
|
||||
SELECT
|
||||
pl.srce
|
||||
,pl.rec
|
||||
,pl.json_key
|
||||
FROM
|
||||
pending_list pl
|
||||
INNER JOIN unmatched_keys u ON
|
||||
u.json_key = pl.json_key
|
||||
ORDER BY
|
||||
pl.id ASC
|
||||
----this conflict is only if an exact duplicate rec json happens, which will be rejected
|
||||
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
|
||||
RETURNING *
|
||||
)
|
||||
|
||||
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
|
||||
|
||||
, logged AS (
|
||||
INSERT INTO
|
||||
tps.trans_log (info)
|
||||
SELECT
|
||||
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
|
||||
||JSONB_BUILD_OBJECT('srce',_srce)
|
||||
||JSONB_BUILD_OBJECT('path',_path)
|
||||
||JSONB_BUILD_OBJECT('not_inserted',
|
||||
(
|
||||
SELECT
|
||||
jsonb_agg(json_key)
|
||||
FROM
|
||||
matched_keys
|
||||
)
|
||||
)
|
||||
||JSONB_BUILD_OBJECT('inserted',
|
||||
(
|
||||
SELECT
|
||||
jsonb_agg(json_key)
|
||||
FROM
|
||||
unmatched_keys
|
||||
)
|
||||
)
|
||||
RETURNING *
|
||||
)
|
||||
|
||||
SELECT
|
||||
id
|
||||
,info
|
||||
INTO
|
||||
_log_id
|
||||
,_log_info
|
||||
FROM
|
||||
logged;
|
||||
|
||||
RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
|
||||
|
||||
/*
|
||||
_message:=
|
||||
(
|
||||
format(
|
||||
$$
|
||||
{
|
||||
"status":"complete",
|
||||
"message":"import of %L for source %L complete"
|
||||
}
|
||||
$$, _path, _srce)::jsonb
|
||||
)||jsonb_build_object('details',_log_info);
|
||||
|
||||
|
||||
RAISE NOTICE '%s',_message;
|
||||
*/
|
||||
--select * from pending_keys
|
||||
--) with data;
|
||||
end;
|
||||
$F$;
|
||||
/*
|
||||
SELECT
|
||||
JSONB_PRETTY(k.json_key) orig,
|
||||
jsonb_pretty(jsonb_build_object('input_constraint',k.json_key)) uq,
|
||||
T.REC
|
||||
FROM
|
||||
tps.x k
|
||||
left outer JOIN tps.trans t ON
|
||||
t.rec @> k.json_key;
|
||||
*/
|
@ -98,8 +98,7 @@ FROM
|
||||
],
|
||||
"name": "First 20",
|
||||
"where": [
|
||||
{"Category":"Restaurantes"},
|
||||
{"Category":"Services"}
|
||||
{}
|
||||
],
|
||||
"function": "extract",
|
||||
"description": "pull first 20 characters from description for mapping"
|
||||
|
10
templates/insert_constraint.json
Normal file
10
templates/insert_constraint.json
Normal file
@ -0,0 +1,10 @@
|
||||
{
|
||||
"unique_constraint": {
|
||||
"{doc,origin_addresses}": [
|
||||
"Washington, DC, USA"
|
||||
],
|
||||
"{doc,destination_addresses}": [
|
||||
"New York, NY, USA"
|
||||
]
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user