diff --git a/deploy/ubm_schema.sql b/deploy/ubm_schema.sql index 27f3e5b..39c0d46 100644 --- a/deploy/ubm_schema.sql +++ b/deploy/ubm_schema.sql @@ -125,7 +125,7 @@ BEGIN FOREACH t SLICE 1 IN ARRAY key_list LOOP --RAISE NOTICE '%', t; --RAISE NOTICE '%', t[1]; - j := j || jsonb_build_object(t[1],rec#>t); + j := j || jsonb_build_object(t::text,rec#>t); END LOOP; RETURN j; END; @@ -519,7 +519,7 @@ BEGIN FROM pending_keys k INNER JOIN tps.trans t ON - t.rec @> k.json_key + t.ic = k.json_key ) -----------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- @@ -543,10 +543,11 @@ BEGIN , inserted AS ( INSERT INTO - tps.trans (srce, rec) + tps.trans (srce, rec, ic) SELECT pl.srce ,pl.rec + ,pl.json_key FROM pending_list pl INNER JOIN unmatched_keys u ON @@ -1488,7 +1489,8 @@ CREATE TABLE tps.trans ( rec jsonb, parse jsonb, map jsonb, - allj jsonb + allj jsonb, + ic jsonb ); @@ -1499,6 +1501,13 @@ CREATE TABLE tps.trans ( COMMENT ON TABLE tps.trans IS 'source records'; +-- +-- Name: COLUMN trans.ic; Type: COMMENT; Schema: tps; Owner: - +-- + +COMMENT ON COLUMN tps.trans.ic IS 'input constraint value'; + + -- -- Name: trans_id_seq; Type: SEQUENCE; Schema: tps; Owner: - -- diff --git a/functions/report_unmapped_recs.sql b/functions/report_unmapped_recs.sql new file mode 100644 index 0000000..cceee91 --- /dev/null +++ b/functions/report_unmapped_recs.sql @@ -0,0 +1,263 @@ +DROP FUNCTION IF EXISTS tps.report_unmapped_recs; +CREATE FUNCTION tps.report_unmapped_recs(_srce text) RETURNS TABLE +( + source text, + map text, + ret_val jsonb, + "count" bigint, + recs jsonb + +) +LANGUAGE plpgsql +AS +$f$ +BEGIN + +/* +first get distinct target json values +then apply regex +*/ + +RETURN QUERY +WITH + +--------------------apply regex operations to transactions--------------------------------------------------------------------------------- + +rx AS ( +SELECT + t.srce, + t.id, + t.rec, + m.target, + m.seq, + regex->>'function' regex_function, + e.v ->> 'field' result_key_name, + e.v ->> 'key' target_json_path, + e.v ->> 'flag' regex_options_flag, + e.v->>'map' map_intention, + e.v->>'retain' retain_result, + e.v->>'regex' regex_expression, + e.rn target_item_number, + COALESCE(mt.rn,rp.rn,1) result_number, + mt.mt rx_match, + rp.rp rx_replace, + --------------------------json key name assigned to return value----------------------------------------------------------------------- + CASE e.v->>'map' + WHEN 'y' THEN + e.v->>'field' + ELSE + null + END map_key, + --------------------------json value resulting from regular expression----------------------------------------------------------------- + CASE e.v->>'map' + WHEN 'y' THEN + CASE regex->>'function' + WHEN 'extract' THEN + CASE WHEN array_upper(mt.mt,1)=1 + THEN to_json(mt.mt[1]) + ELSE array_to_json(mt.mt) + END::jsonb + WHEN 'replace' THEN + to_jsonb(rp.rp) + ELSE + '{}'::jsonb + END + ELSE + NULL + END map_val, + --------------------------flag for if retruned regex result is stored as a new part of the final json output--------------------------- + CASE e.v->>'retain' + WHEN 'y' THEN + e.v->>'field' + ELSE + NULL + END retain_key, + --------------------------push regex result into json object--------------------------------------------------------------------------- + CASE e.v->>'retain' + WHEN 'y' THEN + CASE regex->>'function' + WHEN 'extract' THEN + CASE WHEN array_upper(mt.mt,1)=1 + THEN to_json(trim(mt.mt[1])) + ELSE array_to_json(mt.mt) + END::jsonb + WHEN 'replace' THEN + to_jsonb(rtrim(rp.rp)) + ELSE + '{}'::jsonb + END + ELSE + NULL + END retain_val +FROM + --------------------------start with all regex maps------------------------------------------------------------------------------------ + tps.map_rm m + --------------------------isolate matching basis to limit map to only look at certain json--------------------------------------------- + JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE + --------------------------break out array of regluar expressions in the map------------------------------------------------------------ + JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true + --------------------------join to main transaction table but only certain key/values are included-------------------------------------- + INNER JOIN tps.trans t ON + t.srce = m.srce AND + t.rec @> w.v + --------------------------each regex references a path to the target value, extract the target from the reference and do regex--------- + LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON + m.regex->>'function' = 'extract' + --------------------------same as above but for a replacement type function------------------------------------------------------------ + LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON + m.regex->>'function' = 'replace' +WHERE + --t.allj IS NULL + t.srce = _srce AND + e.v @> '{"map":"y"}'::jsonb + --rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}' + --rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb +/* +ORDER BY + t.id DESC, + m.target, + e.rn, + COALESCE(mt.rn,rp.rn,1) +*/ +) + +--SELECT * FROM rx LIMIT 100 + + +, agg_to_target_items AS ( +SELECT + srce + ,id + ,rec + ,target + ,seq + ,map_intention + ,regex_function + ,target_item_number + ,result_key_name + ,target_json_path + ,CASE WHEN map_key IS NULL + THEN + NULL + ELSE + jsonb_build_object( + map_key, + CASE WHEN max(result_number) = 1 + THEN + jsonb_agg(map_val ORDER BY result_number) -> 0 + ELSE + jsonb_agg(map_val ORDER BY result_number) + END + ) + END map_val + ,CASE WHEN retain_key IS NULL + THEN + NULL + ELSE + jsonb_build_object( + retain_key, + CASE WHEN max(result_number) = 1 + THEN + jsonb_agg(retain_val ORDER BY result_number) -> 0 + ELSE + jsonb_agg(retain_val ORDER BY result_number) + END + ) + END retain_val +FROM + rx +GROUP BY + srce + ,id + ,rec + ,target + ,seq + ,map_intention + ,regex_function + ,target_item_number + ,result_key_name + ,target_json_path + ,map_key + ,retain_key +) + +--SELECT * FROM agg_to_target_items LIMIT 100 + + +, agg_to_target AS ( +SELECT + srce + ,id + ,rec + ,target + ,seq + ,map_intention + ,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val + ,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val +FROM + agg_to_target_items +GROUP BY + srce + ,id + ,rec + ,target + ,seq + ,map_intention +) + + +, agg_to_ret AS ( +SELECT + srce + ,target + ,seq + ,map_intention + ,map_val + ,retain_val + ,count(*) "count" + ,jsonb_agg(rec) rec +FROM + agg_to_target +GROUP BY + srce + ,target + ,seq + ,map_intention + ,map_val + ,retain_val +) + +, link_map AS ( +SELECT + a.srce + ,a.target + ,a.seq + ,a.map_intention + ,a.map_val + ,a."count" + ,a.rec + ,a.retain_val + ,v.map mapped_val +FROM + agg_to_ret a + LEFT OUTER JOIN tps.map_rv v ON + v.srce = a.srce AND + v.target = a.target AND + v.retval = a.map_val +) +SELECT + l.srce + ,l.target + ,l.map_val + ,l."count" + ,l.rec +FROM + link_map l +WHERE + l.mapped_val IS NULL +ORDER BY + l.srce + ,l.target + ,l."count" desc; +END; +$f$ \ No newline at end of file diff --git a/functions/srce_import.sql b/functions/srce_import.sql index 9cb2a85..32026d9 100644 --- a/functions/srce_import.sql +++ b/functions/srce_import.sql @@ -144,7 +144,7 @@ BEGIN FROM pending_keys k INNER JOIN tps.trans t ON - t.rec @> k.json_key + t.ic = k.json_key ) -----------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- @@ -168,10 +168,11 @@ BEGIN , inserted AS ( INSERT INTO - tps.trans (srce, rec) + tps.trans (srce, rec, ic) SELECT pl.srce ,pl.rec + ,pl.json_key FROM pending_list pl INNER JOIN unmatched_keys u ON diff --git a/functions/srce_import_dev.sql b/functions/srce_import_dev.sql new file mode 100644 index 0000000..d08172f --- /dev/null +++ b/functions/srce_import_dev.sql @@ -0,0 +1,243 @@ + +DO $F$ +DECLARE _t text; +DECLARE _c text; +DECLARE _log_info jsonb; +DECLARE _log_id text; +DECLARE _cnt numeric; +DECLARE _message jsonb; +_MESSAGE_TEXT text; +_PG_EXCEPTION_DETAIL text; +_PG_EXCEPTION_HINT text; +_path text; +_srce text; + +BEGIN + + _path := 'C:\users\fleet\downloads\testj.csv'; + _srce := 'DMAPI'; + +----------------------------------------------------test if source exists---------------------------------------------------------------------------------- + + SELECT + COUNT(*) + INTO + _cnt + FROM + tps.srce + WHERE + srce = _srce; + + IF _cnt = 0 THEN + _message:= + format( + $$ + { + "status":"fail", + "message":"source %L does not exists" + } + $$, + _srce + )::jsonb; + END IF; +----------------------------------------------------build the column list of the temp table---------------------------------------------------------------- + + SELECT + string_agg(quote_ident(prs.key)||' '||prs.type,','), + string_agg(quote_ident(prs.key),',') + INTO + _t, + _c + FROM + tps.srce + --unwrap the schema definition array + LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE + WHERE + srce = _srce + GROUP BY + srce; + +----------------------------------------------------add create table verbage in front of column list-------------------------------------------------------- + + _t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t); + --RAISE NOTICE '%', _t; + --RAISE NOTICE '%', _c; + + DROP TABLE IF EXISTS csv_i; + + EXECUTE _t; + +----------------------------------------------------do the insert------------------------------------------------------------------------------------------- + + --the column list needs to be dynamic forcing this whole line to be dynamic + _t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path); + + --RAISE NOTICE '%', _t; + + EXECUTE _t; + + --drop table if exists tps.x; + --create table tps.x as + --( + WITH + + -------------extract the limiter fields to one row per source---------------------------------- + + ext AS ( + SELECT + srce + ,defn->'unique_constraint'->>'fields' + ,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array + FROM + tps.srce + WHERE + srce = _srce + --add where clause for targeted source + ) + + -------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn----------- + + ,pending_list AS ( + SELECT + tps.jsonb_extract( + row_to_json(i)::jsonb + ,ext.text_array + ) json_key, + row_to_json(i)::JSONB rec, + srce, + --ae.rn, + id + FROM + csv_i i + INNER JOIN ext ON + ext.srce = _srce + ORDER BY + id ASC + ) + + + -----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------ + + , pending_keys AS ( + SELECT DISTINCT + json_key + FROM + pending_list + ) + + -----------list of keys already loaded to tps----------------------------------------------------------------------------------------------------- + + , matched_keys AS ( + SELECT DISTINCT + k.json_key + FROM + pending_keys k + INNER JOIN tps.trans t ON + t.ic = k.json_key + ) + + -----------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- + + , unmatched_keys AS ( + SELECT + json_key + FROM + pending_keys + + EXCEPT + + SELECT + json_key + FROM + matched_keys + ) + + -----------insert pending rows that have key with no trans match----------------------------------------------------------------------------------- + --need to look into mapping the transactions prior to loading + + , inserted AS ( + INSERT INTO + tps.trans (srce, rec, ic) + SELECT + pl.srce + ,pl.rec + ,pl.json_key + FROM + pending_list pl + INNER JOIN unmatched_keys u ON + u.json_key = pl.json_key + ORDER BY + pl.id ASC + ----this conflict is only if an exact duplicate rec json happens, which will be rejected + ----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified + RETURNING * + ) + + --------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------ + + , logged AS ( + INSERT INTO + tps.trans_log (info) + SELECT + JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP) + ||JSONB_BUILD_OBJECT('srce',_srce) + ||JSONB_BUILD_OBJECT('path',_path) + ||JSONB_BUILD_OBJECT('not_inserted', + ( + SELECT + jsonb_agg(json_key) + FROM + matched_keys + ) + ) + ||JSONB_BUILD_OBJECT('inserted', + ( + SELECT + jsonb_agg(json_key) + FROM + unmatched_keys + ) + ) + RETURNING * + ) + + SELECT + id + ,info + INTO + _log_id + ,_log_info + FROM + logged; + + RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info; + + /* + _message:= + ( + format( + $$ + { + "status":"complete", + "message":"import of %L for source %L complete" + } + $$, _path, _srce)::jsonb + )||jsonb_build_object('details',_log_info); + + + RAISE NOTICE '%s',_message; + */ + --select * from pending_keys + --) with data; +end; +$F$; +/* +SELECT + JSONB_PRETTY(k.json_key) orig, + jsonb_pretty(jsonb_build_object('input_constraint',k.json_key)) uq, + T.REC +FROM + tps.x k + left outer JOIN tps.trans t ON + t.rec @> k.json_key; +*/ \ No newline at end of file diff --git a/sample_discovercard/mapping.md b/sample_discovercard/mapping.md index c48a22f..9ad012b 100644 --- a/sample_discovercard/mapping.md +++ b/sample_discovercard/mapping.md @@ -98,8 +98,7 @@ FROM ], "name": "First 20", "where": [ - {"Category":"Restaurantes"}, - {"Category":"Services"} + {} ], "function": "extract", "description": "pull first 20 characters from description for mapping" diff --git a/templates/insert_constraint.json b/templates/insert_constraint.json new file mode 100644 index 0000000..4fbbe99 --- /dev/null +++ b/templates/insert_constraint.json @@ -0,0 +1,10 @@ +{ + "unique_constraint": { + "{doc,origin_addresses}": [ + "Washington, DC, USA" + ], + "{doc,destination_addresses}": [ + "New York, NY, USA" + ] + } +} \ No newline at end of file