137 lines
3.6 KiB
MySQL
137 lines
3.6 KiB
MySQL
|
DO $F$
|
||
|
DECLARE _t text;
|
||
|
DECLARE _c text;
|
||
|
DECLARE _log_info jsonb;
|
||
|
DECLARE _log_id text;
|
||
|
DECLARE _cnt numeric;
|
||
|
DECLARE _message jsonb;
|
||
|
_MESSAGE_TEXT text;
|
||
|
_PG_EXCEPTION_DETAIL text;
|
||
|
_PG_EXCEPTION_HINT text;
|
||
|
_path text;
|
||
|
_srce text;
|
||
|
|
||
|
BEGIN
|
||
|
|
||
|
_path := 'C:\users\fleet\downloads\testj.csv';
|
||
|
_srce := 'DMAPI';
|
||
|
|
||
|
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
|
||
|
|
||
|
SELECT
|
||
|
COUNT(*)
|
||
|
INTO
|
||
|
_cnt
|
||
|
FROM
|
||
|
tps.srce
|
||
|
WHERE
|
||
|
srce = _srce;
|
||
|
|
||
|
IF _cnt = 0 THEN
|
||
|
_message:=
|
||
|
format(
|
||
|
$$
|
||
|
{
|
||
|
"status":"fail",
|
||
|
"message":"source %L does not exists"
|
||
|
}
|
||
|
$$,
|
||
|
_srce
|
||
|
)::jsonb;
|
||
|
END IF;
|
||
|
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
|
||
|
|
||
|
SELECT
|
||
|
string_agg(quote_ident(prs.key)||' '||prs.type,','),
|
||
|
string_agg(quote_ident(prs.key),',')
|
||
|
INTO
|
||
|
_t,
|
||
|
_c
|
||
|
FROM
|
||
|
tps.srce
|
||
|
--unwrap the schema definition array
|
||
|
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
||
|
WHERE
|
||
|
srce = _srce
|
||
|
GROUP BY
|
||
|
srce;
|
||
|
|
||
|
----------------------------------------------------add create table verbage in front of column list--------------------------------------------------------
|
||
|
|
||
|
_t := format('CREATE TEMP TABLE csv_i (%s, id SERIAL)', _t);
|
||
|
--RAISE NOTICE '%', _t;
|
||
|
--RAISE NOTICE '%', _c;
|
||
|
|
||
|
DROP TABLE IF EXISTS csv_i;
|
||
|
|
||
|
EXECUTE _t;
|
||
|
|
||
|
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
|
||
|
|
||
|
--the column list needs to be dynamic forcing this whole line to be dynamic
|
||
|
_t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
|
||
|
|
||
|
--RAISE NOTICE '%', _t;
|
||
|
|
||
|
EXECUTE _t;
|
||
|
|
||
|
drop table if exists tps.x;
|
||
|
create table tps.x as
|
||
|
(
|
||
|
WITH
|
||
|
|
||
|
-------------extract the limiter fields to one row per source----------------------------------
|
||
|
|
||
|
ext AS (
|
||
|
SELECT
|
||
|
srce
|
||
|
,defn->'unique_constraint'->>'fields'
|
||
|
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
|
||
|
FROM
|
||
|
tps.srce
|
||
|
WHERE
|
||
|
srce = _srce
|
||
|
--add where clause for targeted source
|
||
|
)
|
||
|
|
||
|
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
|
||
|
|
||
|
,pending_list AS (
|
||
|
SELECT
|
||
|
tps.jsonb_extract(
|
||
|
row_to_json(i)::jsonb
|
||
|
,ext.text_array
|
||
|
) json_key,
|
||
|
row_to_json(i)::JSONB rec,
|
||
|
srce,
|
||
|
--ae.rn,
|
||
|
id
|
||
|
FROM
|
||
|
csv_i i
|
||
|
INNER JOIN ext ON
|
||
|
ext.srce = _srce
|
||
|
ORDER BY
|
||
|
id ASC
|
||
|
)
|
||
|
|
||
|
|
||
|
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
||
|
|
||
|
, pending_keys AS (
|
||
|
SELECT DISTINCT
|
||
|
json_key
|
||
|
FROM
|
||
|
pending_list
|
||
|
)
|
||
|
|
||
|
|
||
|
select * from pending_keys
|
||
|
) with data;
|
||
|
end;
|
||
|
$F$;
|
||
|
SELECT
|
||
|
k.json_key, T.REC
|
||
|
FROM
|
||
|
tps.x k
|
||
|
left outer JOIN tps.trans t ON
|
||
|
t.rec @> k.json_key;
|