185 lines
5.0 KiB
PL/PgSQL
185 lines
5.0 KiB
PL/PgSQL
DO
|
|
$f$
|
|
DECLARE
|
|
_t text;
|
|
_c text;
|
|
_log_info jsonb;
|
|
_log_id text;
|
|
_cnt numeric;
|
|
_message jsonb;
|
|
_recs jsonb;
|
|
_srce text;
|
|
_defn jsonb;
|
|
_MESSAGE_TEXT text;
|
|
_PG_EXCEPTION_DETAIL text;
|
|
_PG_EXCEPTION_HINT text;
|
|
|
|
BEGIN
|
|
|
|
_srce := 'DMAPI';
|
|
_recs:= $${"id":1,"doc":{"rows":[{"elements":[{"status":"OK","distance":{"text":"225 mi","value":361940},"duration":{"text":"3 hours 50 mins","value":13812}}]}],"status":"OK","origin_addresses":["Washington, DC, USA"],"destination_addresses":["New York, NY, USA"]}}$$::jsonb;
|
|
|
|
----------------------------------------------------test if source exists----------------------------------------------------------------------------------
|
|
|
|
SELECT
|
|
defn
|
|
INTO
|
|
_defn
|
|
FROM
|
|
tps.srce
|
|
WHERE
|
|
srce = _srce;
|
|
|
|
IF _defn IS NULL THEN
|
|
_message:=
|
|
format(
|
|
$$
|
|
{
|
|
"status":"fail",
|
|
"message":"source %L does not exists"
|
|
}
|
|
$$,
|
|
_srce
|
|
)::jsonb;
|
|
RAISE NOTICE '%s', _message;
|
|
END IF;
|
|
|
|
-------------unwrap the json record and apply the path(s) of the constraint to build a constraint key per record-----------------------------------------------------------------------------------
|
|
|
|
WITH
|
|
pending_list AS (
|
|
SELECT
|
|
_srce srce
|
|
,j.rec
|
|
,j.id
|
|
--aggregate back to the record since multiple paths may be listed in the constraint
|
|
--it is unclear why the "->>0" is required to correctly extract the text array from the jsonb
|
|
,tps.jsonb_concat_obj(
|
|
jsonb_build_object(
|
|
--the new json key is the path itself
|
|
cons.path->>0
|
|
,j.rec#>((cons.path->>0)::text[])
|
|
)
|
|
) json_key
|
|
FROM
|
|
jsonb_array_elements(_recs) WITH ORDINALITY j(rec,id)
|
|
JOIN LATERAL jsonb_array_elements(_defn->'constraint') WITH ORDINALITY cons(path, seq) ON TRUE
|
|
GROUP BY
|
|
j.rec
|
|
,j.id
|
|
)
|
|
|
|
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
|
|
|
|
, pending_keys AS (
|
|
SELECT DISTINCT
|
|
json_key
|
|
FROM
|
|
pending_list
|
|
)
|
|
|
|
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
|
|
|
|
, matched_keys AS (
|
|
SELECT DISTINCT
|
|
k.json_key
|
|
FROM
|
|
pending_keys k
|
|
INNER JOIN tps.trans t ON
|
|
t.ic = k.json_key
|
|
)
|
|
|
|
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
|
|
|
|
, unmatched_keys AS (
|
|
SELECT
|
|
json_key
|
|
FROM
|
|
pending_keys
|
|
|
|
EXCEPT
|
|
|
|
SELECT
|
|
json_key
|
|
FROM
|
|
matched_keys
|
|
)
|
|
|
|
--------build log record-------------------+------------------------------------------------------------------------------------------------
|
|
|
|
, logged AS (
|
|
INSERT INTO
|
|
tps.trans_log (info)
|
|
SELECT
|
|
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
|
|
||JSONB_BUILD_OBJECT('srce',_srce)
|
|
--||JSONB_BUILD_OBJECT('path',_path)
|
|
||JSONB_BUILD_OBJECT('not_inserted',
|
|
(
|
|
SELECT
|
|
jsonb_agg(json_key)
|
|
FROM
|
|
matched_keys
|
|
)
|
|
)
|
|
||JSONB_BUILD_OBJECT('inserted',
|
|
(
|
|
SELECT
|
|
jsonb_agg(json_key)
|
|
FROM
|
|
unmatched_keys
|
|
)
|
|
)
|
|
RETURNING *
|
|
)
|
|
|
|
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
|
|
--need to look into mapping the transactions prior to loading
|
|
|
|
, inserted AS (
|
|
INSERT INTO
|
|
tps.trans (srce, rec, ic, logid)
|
|
SELECT
|
|
pl.srce
|
|
,pl.rec
|
|
,pl.json_key
|
|
,logged.id
|
|
FROM
|
|
pending_list pl
|
|
INNER JOIN unmatched_keys u ON
|
|
u.json_key = pl.json_key
|
|
CROSS JOIN logged
|
|
ORDER BY
|
|
pl.id ASC
|
|
----this conflict is only if an exact duplicate rec json happens, which will be rejected
|
|
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
|
|
RETURNING *
|
|
)
|
|
|
|
SELECT
|
|
id
|
|
,info
|
|
INTO
|
|
_log_id
|
|
,_log_info
|
|
FROM
|
|
logged;
|
|
|
|
--RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
|
|
|
|
_message:=
|
|
(
|
|
$$
|
|
{
|
|
"status":"complete"
|
|
}
|
|
$$::jsonb
|
|
)||jsonb_build_object('details',_log_info);
|
|
|
|
RAISE NOTICE '%s', _message;
|
|
|
|
END;
|
|
$f$
|
|
LANGUAGE plpgsql
|
|
|