diff --git a/srce.pgsql b/srce.pgsql index 8a51587..0f1b237 100644 --- a/srce.pgsql +++ b/srce.pgsql @@ -6,8 +6,7 @@ 2. get unqiue pending keys 3. see which keys not already in tps.trans 4. insert pending records associated with keys that are not already in trans -5. get list of recors not inserted -6. summarize records not inserted +5. insert summary to log table */--------------------------------------------------------- @@ -15,8 +14,13 @@ DO $$ DECLARE _t text; DECLARE _c text; +DECLARE _path text; +DECLARE _srce text; BEGIN + + _path := 'C:\users\fleet\downloads\dc1024.csv'; + _srce := 'DCARD'; ----------------------------------------------------build the column list of the temp table---------------------------------------------------------------- @@ -27,11 +31,11 @@ BEGIN _t, _c FROM - TPS.srce + tps.srce --unwrap the schema definition array LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE WHERE - srce = 'HUNT' + srce = _srce GROUP BY srce; @@ -48,122 +52,129 @@ BEGIN ----------------------------------------------------do the insert------------------------------------------------------------------------------------------- --the column list needs to be dynamic forcing this whole line to be dynamic - _t := format('COPY csv_i (%s) FROM ''C:\Users\fleet\downloads\hunt.csv'' WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c); + _t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path); --RAISE NOTICE '%', _t; EXECUTE _t; + WITH + + -------------extract the limiter fields to one row per source---------------------------------- + + ext AS ( + SELECT + srce + ,defn->'unique_constraint'->>'fields' + ,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array + FROM + tps.srce + WHERE + srce = _srce + --add where clause for targeted source + ) + + -------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn----------- + + ,pending_list AS ( + SELECT + jsonb_extract( + row_to_json(i)::jsonb + ,ext.text_array + ) json_key, + row_to_json(i)::JSONB rec, + srce, + --ae.rn, + id + FROM + csv_i i + INNER JOIN ext ON + ext.srce = _srce + ORDER BY + id ASC + ) + + -----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------ + + , pending_keys AS ( + SELECT DISTINCT + json_key + FROM + pending_list + ) + + -----------list of keys already loaded to tps----------------------------------------------------------------------------------------------------- + + , matched_keys AS ( + SELECT DISTINCT + k.json_key + FROM + pending_keys k + INNER JOIN tps.trans t ON + t.rec @> k.json_key + ) + + -----------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- + + , unmatched_keys AS ( + SELECT + json_key + FROM + pending_keys + + EXCEPT + + SELECT + json_key + FROM + matched_keys + ) + + -----------insert pending rows that have key with no trans match----------------------------------------------------------------------------------- + --need to look into mapping the transactions prior to loading + + , inserted AS ( + INSERT INTO + tps.trans (srce, rec) + SELECT + pl.srce + ,pl.rec + FROM + pending_list pl + INNER JOIN unmatched_keys u ON + u.json_key = pl.json_key + ORDER BY + pl.id ASC + ----this conflict is only if an exact duplicate rec json happens, which will be rejected + ----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified + RETURNING * + ) + + --------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------ + + INSERT INTO + tps.trans_log (info) + SELECT + JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP) + ||JSONB_BUILD_OBJECT('srce',_srce) + ||JSONB_BUILD_OBJECT('path',_path) + ||JSONB_BUILD_OBJECT('not_inserted', + ( + SELECT + jsonb_agg(json_key) + FROM + matched_keys + ) + ) + ||JSONB_BUILD_OBJECT('inserted', + ( + SELECT + jsonb_agg(json_key) + FROM + unmatched_keys + ) + ); END $$; - -WITH - --------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn----------- - -pending_list AS ( - SELECT - ---creates a key value pair and then aggregates rows of key value pairs - jsonb_object_agg( - (ae.e::text[])[1], --the key name - (row_to_json(i)::jsonb) #> ae.e::text[] --get the target value from the key from the csv row that has been converted to json - ) json_key, - row_to_json(i)::JSONB rec, - srce, - --ae.rn, - id - FROM - csv_i i - INNER JOIN tps.srce s ON - s.srce = 'HUNT' - LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE - GROUP BY - i.*, - srce, - id - ORDER BY - id ASC -) - ------------create a unique list of keys from staged rows------------------------------------------------------------------------------------------ - -, pending_keys AS ( - SELECT DISTINCT - json_key - FROM - pending_list -) - ------------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- - -, unmatched_keys AS ( -SELECT - json_key -FROM - pending_keys - -EXCEPT - -SELECT DISTINCT - k.json_key -FROM - pending_keys k - INNER JOIN tps.trans t ON - t.rec @> k.json_key -) - ------------insert pending rows that have key with no trans match----------------------------------------------------------------------------------- - -, inserted AS ( - INSERT INTO - tps.trans (srce, rec) - SELECT - pl.srce - ,pl.rec - FROM - pending_list pl - INNER JOIN unmatched_keys u ON - u.json_key = pl.json_key - ORDER BY - pl.id ASC - ----this conflict is only if an exact duplicate rec json happens, which will be rejected - ----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified - RETURNING * -) - ------------list of records not inserted-------------------------------------------------------------------------------------------------------------- - -, not_inserted AS ( - SELECT - srce - ,rec - FROM - pending_list - - EXCEPT ALL - - SELECT - srce - ,rec - FROM - inserted -) - ---------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------ - -SELECT - t.srce - ,(ae.e::text[])[1] unq_constr - ,MIN(rec #>> ae.e::text[]) min_text - ,MAX(rec #>> ae.e::text[]) max_text - ,JSONB_PRETTY(JSON_AGG(rec #> ae.e::text[] ORDER BY rec #>> ae.e::text[])::JSONB) -FROM - not_inserted t - INNER JOIN tps.srce s ON - s.srce = t.srce - LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE -GROUP BY - t.srce - ,(ae.e::text[])[1]; \ No newline at end of file diff --git a/srce_defn.pgsql b/srce_defn.pgsql index 79046fb..85dfb78 100644 --- a/srce_defn.pgsql +++ b/srce_defn.pgsql @@ -7,11 +7,13 @@ SELECT FROM tps.srce ) + + SELECT - srce - , - public.jsonb_extract(rec,txa) + t.srce + ,jsonb_pretty(t.rec) + ,jsonb_pretty(public.jsonb_extract(rec,txa)) FROM - tps.trans + tps.trans t INNER JOIN ext ON - trans.srce = ext.srce \ No newline at end of file + t.srce = ext.srce \ No newline at end of file