From 53b945ab88ebbc92fba0f9689f19e44329d38634 Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Mon, 26 Feb 2018 23:45:16 -0500 Subject: [PATCH] add main data import function --- deploy/ubm_schema.sql | 250 +++++++++++++++++++++++++++++++++++- functions/srce_import.pgsql | 70 +++++++++- 2 files changed, 312 insertions(+), 8 deletions(-) diff --git a/deploy/ubm_schema.sql b/deploy/ubm_schema.sql index f6ac09a..8cd8d33 100644 --- a/deploy/ubm_schema.sql +++ b/deploy/ubm_schema.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 10rc1 --- Dumped by pg_dump version 10rc1 +-- Dumped from database version 10.2 +-- Dumped by pg_dump version 10.2 SET statement_timeout = 0; SET lock_timeout = 0; @@ -104,6 +104,252 @@ END; $$; +-- +-- Name: srce_import(text, text); Type: FUNCTION; Schema: tps; Owner: - +-- + +CREATE FUNCTION srce_import(_path text, _srce text) RETURNS jsonb + LANGUAGE plpgsql + AS $_$ +DECLARE _t text; +DECLARE _c text; +DECLARE _log_info jsonb; +DECLARE _log_id text; +DECLARE _cnt numeric; +DECLARE _message jsonb; +_MESSAGE_TEXT text; +_PG_EXCEPTION_DETAIL text; +_PG_EXCEPTION_HINT text; + +BEGIN + + --_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv'; + --_srce := 'DCARD'; + +----------------------------------------------------test if source exists---------------------------------------------------------------------------------- + + SELECT + COUNT(*) + INTO + _cnt + FROM + tps.srce + WHERE + srce = _srce; + + IF _cnt = 0 THEN + _message:= + format( + $$ + { + "status":"fail", + "message":"source %L does not exists" + } + $$, + _srce + )::jsonb; + RETURN _message; + END IF; +----------------------------------------------------build the column list of the temp table---------------------------------------------------------------- + + SELECT + string_agg(quote_ident(prs.key)||' '||prs.type,','), + string_agg(quote_ident(prs.key),',') + INTO + _t, + _c + FROM + tps.srce + --unwrap the schema definition array + LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE + WHERE + srce = _srce + GROUP BY + srce; + +----------------------------------------------------add create table verbage in front of column list-------------------------------------------------------- + + _t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t); + --RAISE NOTICE '%', _t; + --RAISE NOTICE '%', _c; + + DROP TABLE IF EXISTS csv_i; + + EXECUTE _t; + +----------------------------------------------------do the insert------------------------------------------------------------------------------------------- + + --the column list needs to be dynamic forcing this whole line to be dynamic + _t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path); + + --RAISE NOTICE '%', _t; + + BEGIN + EXECUTE _t; + EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS + _MESSAGE_TEXT = MESSAGE_TEXT, + _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, + _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; + _message:= + ($$ + { + "status":"fail", + "message":"error importing data" + } + $$::jsonb) + ||jsonb_build_object('message_text',_MESSAGE_TEXT) + ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); + return _message; + END; + + WITH + + -------------extract the limiter fields to one row per source---------------------------------- + + ext AS ( + SELECT + srce + ,defn->'unique_constraint'->>'fields' + ,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array + FROM + tps.srce + WHERE + srce = _srce + --add where clause for targeted source + ) + + -------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn----------- + + ,pending_list AS ( + SELECT + jsonb_extract( + row_to_json(i)::jsonb + ,ext.text_array + ) json_key, + row_to_json(i)::JSONB rec, + srce, + --ae.rn, + id + FROM + csv_i i + INNER JOIN ext ON + ext.srce = _srce + ORDER BY + id ASC + ) + + -----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------ + + , pending_keys AS ( + SELECT DISTINCT + json_key + FROM + pending_list + ) + + -----------list of keys already loaded to tps----------------------------------------------------------------------------------------------------- + + , matched_keys AS ( + SELECT DISTINCT + k.json_key + FROM + pending_keys k + INNER JOIN tps.trans t ON + t.rec @> k.json_key + ) + + -----------return unique keys that are not already in tps.trans----------------------------------------------------------------------------------- + + , unmatched_keys AS ( + SELECT + json_key + FROM + pending_keys + + EXCEPT + + SELECT + json_key + FROM + matched_keys + ) + + -----------insert pending rows that have key with no trans match----------------------------------------------------------------------------------- + --need to look into mapping the transactions prior to loading + + , inserted AS ( + INSERT INTO + tps.trans (srce, rec) + SELECT + pl.srce + ,pl.rec + FROM + pending_list pl + INNER JOIN unmatched_keys u ON + u.json_key = pl.json_key + ORDER BY + pl.id ASC + ----this conflict is only if an exact duplicate rec json happens, which will be rejected + ----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified + RETURNING * + ) + + --------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------ + + , logged AS ( + INSERT INTO + tps.trans_log (info) + SELECT + JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP) + ||JSONB_BUILD_OBJECT('srce',_srce) + ||JSONB_BUILD_OBJECT('path',_path) + ||JSONB_BUILD_OBJECT('not_inserted', + ( + SELECT + jsonb_agg(json_key) + FROM + matched_keys + ) + ) + ||JSONB_BUILD_OBJECT('inserted', + ( + SELECT + jsonb_agg(json_key) + FROM + unmatched_keys + ) + ) + RETURNING * + ) + + SELECT + id + ,info + INTO + _log_id + ,_log_info + FROM + logged; + + --RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info; + + _message:= + ( + format( + $$ + { + "status":"complete", + "message":"import of %L for source %L complete" + } + $$, _path, _srce)::jsonb + )||josnb_build_object('details',_log_info); + + RETURN _message; +END +$_$; + + -- -- Name: srce_set(text, jsonb); Type: FUNCTION; Schema: tps; Owner: - -- diff --git a/functions/srce_import.pgsql b/functions/srce_import.pgsql index 517d77e..fb5253e 100644 --- a/functions/srce_import.pgsql +++ b/functions/srce_import.pgsql @@ -18,14 +18,43 @@ CREATE OR REPLACE FUNCTION tps.srce_import(_path text, _srce text) RETURNS jsonb AS $f$ DECLARE _t text; DECLARE _c text; -DECLARE _log_info text; +DECLARE _log_info jsonb; DECLARE _log_id text; +DECLARE _cnt numeric; +DECLARE _message jsonb; +_MESSAGE_TEXT text; +_PG_EXCEPTION_DETAIL text; +_PG_EXCEPTION_HINT text; BEGIN - _path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv'; - _srce := 'DCARD'; - + --_path := 'C:\users\fleet\downloads\discover-recentactivity-20171031.csv'; + --_srce := 'DCARD'; + +----------------------------------------------------test if source exists---------------------------------------------------------------------------------- + + SELECT + COUNT(*) + INTO + _cnt + FROM + tps.srce + WHERE + srce = _srce; + + IF _cnt = 0 THEN + _message:= + format( + $$ + { + "status":"fail", + "message":"source %L does not exists" + } + $$, + _srce + )::jsonb; + RETURN _message; + END IF; ----------------------------------------------------build the column list of the temp table---------------------------------------------------------------- SELECT @@ -60,7 +89,24 @@ BEGIN --RAISE NOTICE '%', _t; - EXECUTE _t; + BEGIN + EXECUTE _t; + EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS + _MESSAGE_TEXT = MESSAGE_TEXT, + _PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL, + _PG_EXCEPTION_HINT = PG_EXCEPTION_HINT; + _message:= + ($$ + { + "status":"fail", + "message":"error importing data" + } + $$::jsonb) + ||jsonb_build_object('message_text',_MESSAGE_TEXT) + ||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL); + return _message; + END; WITH @@ -191,8 +237,20 @@ BEGIN FROM logged; - RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info; + --RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info; + _message:= + ( + format( + $$ + { + "status":"complete", + "message":"import of %L for source %L complete" + } + $$, _path, _srce)::jsonb + )||josnb_build_object('details',_log_info); + + RETURN _message; END $f$ LANGUAGE plpgsql