Merge branch 'dev_setup' of https://github.com/fleetside72/tps_etl into dev_setup
This commit is contained in:
commit
46a1c94643
@ -69,7 +69,8 @@ COMMENT ON TABLE tps.map_rv IS 'return value lookup table';
|
||||
|
||||
CREATE TABLE tps.srce (
|
||||
srce text NOT NULL,
|
||||
defn jsonb
|
||||
defn jsonb,
|
||||
hist jsonb
|
||||
);
|
||||
COMMENT ON TABLE tps.srce IS 'source master listing and definition';
|
||||
|
||||
@ -147,3 +148,72 @@ ALTER TABLE ONLY tps.map_rv
|
||||
|
||||
ALTER TABLE ONLY tps.trans
|
||||
ADD CONSTRAINT trans_srce_fkey FOREIGN KEY (srce) REFERENCES tps.srce(srce);
|
||||
|
||||
-------------create functions------------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
CREATE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
|
||||
AS
|
||||
$f$
|
||||
DECLARE
|
||||
_message jsonb;
|
||||
_MESSAGE_TEXT text;
|
||||
_PG_EXCEPTION_DETAIL text;
|
||||
_PG_EXCEPTION_HINT text;
|
||||
BEGIN
|
||||
INSERT INTO
|
||||
tps.srce (srce, defn, hist)
|
||||
SELECT
|
||||
--extract name from defintion
|
||||
_defn->>'name'
|
||||
--add current timestamp to defintions
|
||||
,_defn
|
||||
--add definition
|
||||
,jsonb_build_object(
|
||||
'hist_defn',_defn
|
||||
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
||||
) || '[]'::jsonb
|
||||
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
|
||||
SET
|
||||
defn = _defn
|
||||
,hist =
|
||||
--the new definition going to position -0-
|
||||
jsonb_build_object(
|
||||
'hist_defn',_defn
|
||||
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
||||
)
|
||||
--the previous definition, set upper bound of effective range which was previously null
|
||||
|| jsonb_set(
|
||||
srce.hist
|
||||
,'{0,effective,1}'::text[]
|
||||
,to_jsonb(CURRENT_TIMESTAMP)
|
||||
);
|
||||
|
||||
_message:=
|
||||
(
|
||||
$$
|
||||
{
|
||||
"status":"complete",
|
||||
"message":"source set"
|
||||
}
|
||||
$$::jsonb
|
||||
);
|
||||
RETURN _message;
|
||||
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
||||
_message:=
|
||||
($$
|
||||
{
|
||||
"status":"fail",
|
||||
"message":"error importing data"
|
||||
}
|
||||
$$::jsonb)
|
||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
||||
RETURN _message;
|
||||
END;
|
||||
$f$
|
||||
LANGUAGE plpgsql;
|
@ -1,107 +1,67 @@
|
||||
|
||||
CREATE OR REPLACE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
|
||||
AS $f$
|
||||
|
||||
DROP FUNCTION IF EXISTS tps.srce_set(jsonb);
|
||||
CREATE FUNCTION tps.srce_set(_defn jsonb) RETURNS jsonb
|
||||
AS
|
||||
$f$
|
||||
DECLARE
|
||||
_cnt int;
|
||||
_conflict BOOLEAN;
|
||||
_message jsonb;
|
||||
_sql text;
|
||||
_cur_sch jsonb;
|
||||
|
||||
_MESSAGE_TEXT text;
|
||||
_PG_EXCEPTION_DETAIL text;
|
||||
_PG_EXCEPTION_HINT text;
|
||||
BEGIN
|
||||
|
||||
/*
|
||||
1. determine if insert or update
|
||||
2. if update, determine if conflicts exists
|
||||
3. do merge
|
||||
*/
|
||||
|
||||
-------extract current source schema for compare--------------------------
|
||||
SELECT
|
||||
defn->'schema'
|
||||
INTO
|
||||
_cur_sch
|
||||
FROM
|
||||
tps.srce
|
||||
WHERE
|
||||
srce = _defn->>'name';
|
||||
|
||||
-------check for transctions already existing under this source-----------
|
||||
SELECT
|
||||
COUNT(*)
|
||||
INTO
|
||||
_cnt
|
||||
FROM
|
||||
tps.trans
|
||||
WHERE
|
||||
srce = _defn->>'name';
|
||||
|
||||
--if there are transaction already and the schema is different stop--------
|
||||
IF _cnt > 0 THEN
|
||||
IF _cur_sch <> _defn->'schema' THEN
|
||||
_conflict = TRUE;
|
||||
--get out of the function somehow
|
||||
_message =
|
||||
$$
|
||||
{
|
||||
"message":"transactions already exist under source profile and there is a pending schema change"
|
||||
,"status":"error"
|
||||
}
|
||||
$$::jsonb;
|
||||
return _message;
|
||||
END IF;
|
||||
END IF;
|
||||
|
||||
/*-------------------------------------------------------
|
||||
do schema validation fo _defn object?
|
||||
---------------------------------------------------------*/
|
||||
|
||||
-------------------insert definition----------------------------------------
|
||||
INSERT INTO
|
||||
tps.srce
|
||||
tps.srce (srce, defn, hist)
|
||||
SELECT
|
||||
_defn->>'name', _defn
|
||||
--extract name from defintion
|
||||
_defn->>'name'
|
||||
--add current timestamp to defintions
|
||||
,_defn
|
||||
--add definition
|
||||
,jsonb_build_object(
|
||||
'hist_defn',_defn
|
||||
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
||||
) || '[]'::jsonb
|
||||
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
|
||||
SET
|
||||
defn = _defn;
|
||||
defn = _defn
|
||||
,hist =
|
||||
--the new definition going to position -0-
|
||||
jsonb_build_object(
|
||||
'hist_defn',_defn
|
||||
,'effective',jsonb_build_array(CURRENT_TIMESTAMP,null::timestamptz)
|
||||
)
|
||||
--the previous definition, set upper bound of effective range which was previously null
|
||||
|| jsonb_set(
|
||||
srce.hist
|
||||
,'{0,effective,1}'::text[]
|
||||
,to_jsonb(CURRENT_TIMESTAMP)
|
||||
);
|
||||
|
||||
------------------drop existing type-----------------------------------------
|
||||
|
||||
EXECUTE format('DROP TYPE IF EXISTS tps.%I',_defn->>'name');
|
||||
|
||||
------------------create new type--------------------------------------------
|
||||
|
||||
SELECT
|
||||
string_agg(quote_ident(prs.key)||' '||prs.type,',')
|
||||
INTO
|
||||
_sql
|
||||
FROM
|
||||
tps.srce
|
||||
--unwrap the schema definition array
|
||||
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
|
||||
WHERE
|
||||
srce = _defn->>'name'
|
||||
GROUP BY
|
||||
srce;
|
||||
|
||||
RAISE NOTICE 'CREATE TYPE tps.% AS (%)',_defn->>'name',_sql;
|
||||
|
||||
EXECUTE format('CREATE TYPE tps.%I AS (%s)',_defn->>'name',_sql);
|
||||
|
||||
EXECUTE format('COMMENT ON TYPE tps.%I IS %L',_defn->>'name',(_defn->>'description'));
|
||||
|
||||
----------------set message-----------------------------------------------------
|
||||
|
||||
_message =
|
||||
_message:=
|
||||
(
|
||||
$$
|
||||
{
|
||||
"message":"definition set"
|
||||
,"status":"success"
|
||||
"status":"complete",
|
||||
"message":"source set"
|
||||
}
|
||||
$$::jsonb;
|
||||
return _message;
|
||||
$$::jsonb
|
||||
);
|
||||
RETURN _message;
|
||||
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_MESSAGE_TEXT = MESSAGE_TEXT,
|
||||
_PG_EXCEPTION_DETAIL = PG_EXCEPTION_DETAIL,
|
||||
_PG_EXCEPTION_HINT = PG_EXCEPTION_HINT;
|
||||
_message:=
|
||||
($$
|
||||
{
|
||||
"status":"fail",
|
||||
"message":"error importing data"
|
||||
}
|
||||
$$::jsonb)
|
||||
||jsonb_build_object('message_text',_MESSAGE_TEXT)
|
||||
||jsonb_build_object('pg_exception_detail',_PG_EXCEPTION_DETAIL);
|
||||
RETURN _message;
|
||||
END;
|
||||
$f$
|
||||
LANGUAGE plpgsql
|
@ -1,71 +0,0 @@
|
||||
--need to build history (trigger)?
|
||||
|
||||
DO $f$
|
||||
|
||||
DECLARE
|
||||
_defn jsonb;
|
||||
_cnt int;
|
||||
_conflict BOOLEAN;
|
||||
_message jsonb;
|
||||
_sql text;
|
||||
_cur_sch jsonb;
|
||||
|
||||
BEGIN
|
||||
|
||||
SELECT
|
||||
$$
|
||||
{
|
||||
"name":"dcard",
|
||||
"source":"client_file",
|
||||
"loading_function":"csv",
|
||||
"constraint":[
|
||||
"{Trans. Date}",
|
||||
"{Post Date}"
|
||||
],
|
||||
"schemas":{
|
||||
"default":[
|
||||
{
|
||||
"path":"{Trans. Date}",
|
||||
"type":"date",
|
||||
"column_name":"Trans. Date"
|
||||
},
|
||||
{
|
||||
"path":"{Post Date}",
|
||||
"type":"date",
|
||||
"column_name":"Post Date"
|
||||
},
|
||||
{
|
||||
"path":"{Description}",
|
||||
"type":"text",
|
||||
"column_name":"Description"
|
||||
},
|
||||
{
|
||||
"path":"{Amount}",
|
||||
"type":"numeric",
|
||||
"column_name":"Amount"
|
||||
},
|
||||
{
|
||||
"path":"{Category}",
|
||||
"type":"text",
|
||||
"column_name":"Category"
|
||||
}
|
||||
],
|
||||
"version2":[]
|
||||
}
|
||||
}
|
||||
$$
|
||||
INTO
|
||||
_defn;
|
||||
|
||||
-------------------insert definition----------------------------------------
|
||||
INSERT INTO
|
||||
tps.srce (srce, defn)
|
||||
SELECT
|
||||
_defn->>'name', _defn
|
||||
ON CONFLICT ON CONSTRAINT srce_pkey DO UPDATE
|
||||
SET
|
||||
defn = _defn;
|
||||
|
||||
END;
|
||||
$f$
|
||||
LANGUAGE plpgsql
|
39
sample_discovercard/srce_set_test.sql
Normal file
39
sample_discovercard/srce_set_test.sql
Normal file
@ -0,0 +1,39 @@
|
||||
SELECT * FROM TPS.SRCE_SET($${
|
||||
"name":"dcard",
|
||||
"source":"client_file",
|
||||
"loading_function":"csv",
|
||||
"constraint":[
|
||||
"{Trans. Date}",
|
||||
"{Post Date}"
|
||||
],
|
||||
"schemas":{
|
||||
"default":[
|
||||
{
|
||||
"path":"{Trans. Date}",
|
||||
"type":"date",
|
||||
"column_name":"Trans. Date"
|
||||
},
|
||||
{
|
||||
"path":"{Post Date}",
|
||||
"type":"date",
|
||||
"column_name":"Post Date"
|
||||
},
|
||||
{
|
||||
"path":"{Description}",
|
||||
"type":"text",
|
||||
"column_name":"Description"
|
||||
},
|
||||
{
|
||||
"path":"{Amount}",
|
||||
"type":"numeric",
|
||||
"column_name":"Amount"
|
||||
},
|
||||
{
|
||||
"path":"{Category}",
|
||||
"type":"text",
|
||||
"column_name":"Category"
|
||||
}
|
||||
],
|
||||
"version2":[]
|
||||
}
|
||||
}$$::JSONB)
|
Loading…
Reference in New Issue
Block a user