tps/srce.pgsql

181 lines
5.2 KiB
Plaintext
Raw Normal View History

\timing
/*--------------------------------------------------------
0. load target import to temp table
1. create pending list
2. get unqiue pending keys
3. see which keys not already in tps.trans
4. insert pending records associated with keys that are not already in trans
2017-10-25 09:07:51 -04:00
5. insert summary to log table
*/---------------------------------------------------------
DO $$
2017-10-15 13:05:20 -04:00
DECLARE _t text;
DECLARE _c text;
DECLARE _path text;
DECLARE _srce text;
2017-10-15 13:05:20 -04:00
BEGIN
2017-10-25 09:07:51 -04:00
_path := 'C:\users\fleet\downloads\dc1024.csv';
_srce := 'DCARD';
2017-10-11 00:29:38 -04:00
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
SELECT
string_agg(quote_ident(prs.key)||' '||prs.type,','),
string_agg(quote_ident(prs.key),',')
INTO
_t,
_c
FROM
2017-10-25 09:07:51 -04:00
tps.srce
--unwrap the schema definition array
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
WHERE
srce = _srce
GROUP BY
srce;
2017-10-11 00:29:38 -04:00
----------------------------------------------------add create table verbage in front of column list--------------------------------------------------------
_t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t);
--RAISE NOTICE '%', _t;
--RAISE NOTICE '%', _c;
2017-10-11 00:29:38 -04:00
DROP TABLE IF EXISTS csv_i;
EXECUTE _t;
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
--the column list needs to be dynamic forcing this whole line to be dynamic
_t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
--RAISE NOTICE '%', _t;
EXECUTE _t;
2017-10-25 09:07:51 -04:00
WITH
2017-10-24 17:37:20 -04:00
2017-10-25 09:07:51 -04:00
-------------extract the limiter fields to one row per source----------------------------------
2017-10-24 17:37:20 -04:00
2017-10-25 09:07:51 -04:00
ext AS (
SELECT
srce
,defn->'unique_constraint'->>'fields'
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
FROM
tps.srce
WHERE
srce = _srce
--add where clause for targeted source
)
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
2017-10-25 09:07:51 -04:00
,pending_list AS (
SELECT
2017-10-25 09:07:51 -04:00
jsonb_extract(
row_to_json(i)::jsonb
,ext.text_array
) json_key,
row_to_json(i)::JSONB rec,
srce,
--ae.rn,
id
FROM
csv_i i
2017-10-25 09:07:51 -04:00
INNER JOIN ext ON
ext.srce = _srce
ORDER BY
id ASC
)
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
, pending_keys AS (
SELECT DISTINCT
json_key
FROM
pending_list
)
2017-10-25 09:07:51 -04:00
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
, matched_keys AS (
SELECT DISTINCT
k.json_key
FROM
pending_keys k
INNER JOIN tps.trans t ON
t.rec @> k.json_key
)
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
, unmatched_keys AS (
SELECT
json_key
FROM
pending_keys
EXCEPT
2017-10-25 09:07:51 -04:00
SELECT
json_key
FROM
2017-10-25 09:07:51 -04:00
matched_keys
)
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
--need to look into mapping the transactions prior to loading
, inserted AS (
INSERT INTO
tps.trans (srce, rec)
SELECT
pl.srce
,pl.rec
FROM
pending_list pl
INNER JOIN unmatched_keys u ON
u.json_key = pl.json_key
ORDER BY
pl.id ASC
----this conflict is only if an exact duplicate rec json happens, which will be rejected
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
RETURNING *
)
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
2017-10-25 09:07:51 -04:00
INSERT INTO
tps.trans_log (info)
SELECT
2017-10-25 09:07:51 -04:00
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
||JSONB_BUILD_OBJECT('srce',_srce)
||JSONB_BUILD_OBJECT('path',_path)
||JSONB_BUILD_OBJECT('not_inserted',
(
SELECT
jsonb_agg(json_key)
FROM
matched_keys
)
)
||JSONB_BUILD_OBJECT('inserted',
(
SELECT
jsonb_agg(json_key)
FROM
unmatched_keys
)
);
END
$$;