Merge branch 'pt'

This commit is contained in:
Paul Trowbridge 2017-10-26 20:01:45 -04:00
commit a3b2928619
21 changed files with 4252 additions and 2201 deletions

Binary file not shown.

7
coa.md
View File

@ -1,3 +1,5 @@
Balance Sheet
-----------------------
* Cash * Cash
- On-hand - On-hand
- Hunt Checking - Hunt Checking
@ -20,5 +22,10 @@
- Discover - Discover
- Kohls - Kohls
- Lowes - Lowes
- BestBuy
- Target
- TheHomeDepot
* Mortgage * Mortgage
- Principle - Principle

22
col_balance.pgsql Normal file
View File

@ -0,0 +1,22 @@
SELECT
id
,rec->>'id'
,r.*
,CASE "Schedule#"
WHEN '02IN Raw Material' THEN 13097563.42
WHEN '03IN Finished Goods' THEN 35790696.52
ELSE 0
END + SUM("Sales"+"Credits & Adjustments"-"Gross Collections") OVER (PARTITION BY "Schedule#" ORDER BY "Schedule#" ASC, "PostDate" ASC, rec->>'id' ASC) running_bal
,(LEAST("CollateralBalance" - "Ineligible Amount","MaxEligible")*("AdvanceRate"/100))::NUMERIC(20,2) qualified_collateral
,(("CollateralBalance" - "Ineligible Amount")*("AdvanceRate"/100))::NUMERIC(20,2) qualified_collateral_nl
FROM
tps.trans
LEFT JOIN LATERAL jsonb_populate_record(null::tps.pncl, rec) r ON TRUE
WHERE
srce = 'PNCL'
--AND rec @> '{"Schedule#":"03IN Finished Goods"}'
ORDER BY
"Schedule#" asc
,r."PostDate" asc
,rec->>'id' asc

13
dcard_bal.pgsql Normal file
View File

@ -0,0 +1,13 @@
\timing
SELECT
r.*,
rec->'id',
SUM(r."Amount") OVER (PARTITION BY srce ORDER BY r."Post Date" asc , rec->>'id' asc, r."Description") + 1061.1 + 22.40
FROM
tps.trans
LEFT JOIN LATERAL jsonb_populate_record(null::tps.dcard, rec) r ON TRUE
WHERE
srce = 'DCARD'
ORDER BY
r."Post Date" asc
,rEC->>'id' asc

137
do_map.pgsql Normal file
View File

@ -0,0 +1,137 @@
WITH
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
rx AS (
SELECT
m.srce,
m.target,
t.id,
t.rec,
jsonb_build_object(
e.v ->> 'key',
(t.rec #> ((e.v ->> 'key')::text[]))
) AS rkey,
CASE regex->>'map'
WHEN 'yes' THEN
jsonb_build_object(
e.v->>'field',
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
)
ELSE
'{}'::jsonb
END retval,
CASE e.v->>'retain'
WHEN 'y' THEN
jsonb_build_object(
e.v->>'field',
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
)
ELSE
'{}'::jsonb
END retain,
m.seq
FROM
tps.map_rm m
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text) WITH ORDINALITY mt(mt, rn) ON
m.regex->>'function' = 'extract'
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag'::text) WITH ORDINALITY rp(rp, rn) ON
m.regex->>'function' = 'replace'
WHERE
t.map IS NULL
AND t.srce = 'DCARD'
ORDER BY
m.srce,
m.seq,
m.target,
t.id,
t.rec,
e.rn
),
----------aggregate regex back to the target level (may be several targets per row)---------------------------------------------------------------
agg_rx AS (
SELECT
rx.srce,
rx.target,
rx.id,
rx.rec,
tps.jsonb_concat_obj(rx.rkey) rkey,
tps.jsonb_concat_obj(rx.retval) AS retval,
tps.jsonb_concat_obj(rx.retain) AS retain,
rx.seq
FROM
--unwrap json instruction and apply regex using a count per original line for re-aggregation
--need to look at integrating regex option like 'g' that would then need aggegated back as an array, or adding the ordinality number to the title
rx
GROUP BY
rx.srce,
rx.target,
rx.id,
rx.rec,
rx.seq
)
-------------aggregate all targets back to row level (id)------------------------------------------------------------------------------------------------
,agg_orig AS (
SELECT
u.srce,
u.id,
u.rec,
string_agg(u.target,',') target,
tps.jsonb_concat_obj(u.retval) retval,
tps.jsonb_concat_obj(u.retain) retain,
tps.jsonb_concat_obj(coalesce(v.map,'{}'::jsonb) ORDER BY seq ) map
FROM
--re-aggregate return values and explude any records where one or more regex failed with a null result
agg_rx u
INNER JOIN tps.map_rv v ON
v.target = u.target AND
v.srce = u.srce AND
v.retval <@ u.retval
GROUP BY
u.srce,
u.id,
u.rec
)
UPDATE
tps.trans t
SET
map = o.map,
parse = o.retain
FROM
agg_orig o
WHERE
o.id = t.id

214
do_map_g_option.pgsql Normal file
View File

@ -0,0 +1,214 @@
\timing
WITH
--------------------apply regex operations to transactions-----------------------------------------------------------------------------------
rx AS (
SELECT
t.srce,
t.id,
t.rec,
m.target,
regex->>'map' map_intention,
regex->>'function' regex_function,
e.v ->> 'field' result_key_name,
e.v ->> 'key' target_json_path,
e.v ->> 'flag' regex_options_flag,
e.v->>'retain' retain_result,
e.v->>'regex' regex_expression,
e.rn target_item_number,
COALESCE(mt.rn,rp.rn,1) result_number,
mt.mt rx_match,
rp.rp rx_replace,
CASE regex->>'map'
WHEN 'yes' THEN
e.v->>'field'
ELSE
null
END map_key,
CASE regex->>'map'
WHEN 'yes' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(mt.mt[1])
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rp.rp)
ELSE
'{}'::jsonb
END
ELSE
NULL
END map_val,
CASE e.v->>'retain'
WHEN 'y' THEN
e.v->>'field'
ELSE
NULL
END retain_key,
CASE e.v->>'retain'
WHEN 'y' THEN
CASE regex->>'function'
WHEN 'extract' THEN
CASE WHEN array_upper(mt.mt,1)=1
THEN to_json(trim(mt.mt[1]))
ELSE array_to_json(mt.mt)
END::jsonb
WHEN 'replace' THEN
to_jsonb(rtrim(rp.rp))
ELSE
'{}'::jsonb
END
ELSE
NULL
END retain_val
FROM
tps.map_rm m
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'where') w(v) ON TRUE
INNER JOIN tps.trans t ON
t.srce = m.srce AND
t.rec @> w.v
LEFT JOIN LATERAL jsonb_array_elements(m.regex->'defn') WITH ORDINALITY e(v, rn) ON true
LEFT JOIN LATERAL regexp_matches(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text,COALESCE(e.v ->> 'flag','')) WITH ORDINALITY mt(mt, rn) ON
m.regex->>'function' = 'extract'
LEFT JOIN LATERAL regexp_replace(t.rec #>> ((e.v ->> 'key')::text[]), e.v ->> 'regex'::text, e.v ->> 'replace'::text,e.v ->> 'flag') WITH ORDINALITY rp(rp, rn) ON
m.regex->>'function' = 'replace'
WHERE
--t.srce = 'PNCC'
rec @> '{"Transaction":"ACH Credits","Transaction":"ACH Debits"}'
--rec @> '{"Description":"CHECK 93013270 086129935"}'::jsonb
ORDER BY
t.id DESC,
m.target,
e.rn,
COALESCE(mt.rn,rp.rn,1)
)
--SELECT * FROM rx
, agg_to_target_items AS (
SELECT
srce
,id
,target
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,CASE WHEN map_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
map_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(map_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(map_val ORDER BY result_number)
END
)
END map_val
,CASE WHEN retain_key IS NULL
THEN
NULL
ELSE
jsonb_build_object(
retain_key,
CASE WHEN max(result_number) = 1
THEN
jsonb_agg(retain_val ORDER BY result_number) -> 0
ELSE
jsonb_agg(retain_val ORDER BY result_number)
END
)
END retain_val
FROM
rx
GROUP BY
srce
,id
,target
,map_intention
,regex_function
,target_item_number
,result_key_name
,target_json_path
,map_key
,retain_key
)
--SELECT * FROM agg_to_target_items
, agg_to_target AS (
SELECT
srce
,id
,target
,map_intention
,tps.jsonb_concat_obj(COALESCE(map_val,'{}'::JSONB)) map_val
,jsonb_strip_nulls(tps.jsonb_concat_obj(COALESCE(retain_val,'{}'::JSONB))) retain_val
FROM
agg_to_target_items
GROUP BY
srce
,id
,target
,map_intention
ORDER BY
id
)
--SELECT * FROM agg_to_target
, link_map AS (
SELECT
a.srce
,a.id
,a.target
,a.map_intention
,a.map_val
,a.retain_val retain_value
,v.map
FROM
agg_to_target a
LEFT OUTER JOIN tps.map_rv v ON
v.srce = a.srce AND
v.target = a.target AND
v.retval = a.map_val
)
--SELECT * FROM link_map
, agg_to_id AS (
SELECT
srce
,id
,tps.jsonb_concat_obj(COALESCE(retain_value,'{}'::jsonb)) retain_val
,tps.jsonb_concat_obj(COALESCE(map,'{}'::jsonb)) map
FROM
link_map
GROUP BY
srce
,id
)
SELECT srce, id, jsonb_pretty(retain_val), jsonb_pretty(map) FROM agg_to_id
/*
UPDATE
tps.trans t
SET
map = o.map,
parse = o.retain_val,
allj = t.rec||o.map||o.retain_val
FROM
agg_to_id o
WHERE
o.id = t.id;
*/

14
loan_bal.pgsql Normal file
View File

@ -0,0 +1,14 @@
\timing
SELECT
r.*,
SUM(r."Advances"+r."Adjustments"-r."Payments") OVER (PARTITION BY "Loan#" ORDER BY r."Post Date" asc ,rec->>'id' asc, r."Reference #" asc)
FROM
tps.trans
LEFT JOIN LATERAL jsonb_populate_record(null::tps.pnco, rec) r ON TRUE
WHERE
rec @> '{"Loan#":"606780191"}'
ORDER BY
r."Loan#"
,r."Post Date" asc
,rec->>'id' asc
,r."Reference #" asc

View File

@ -1,40 +1,43 @@
SELECT DELETE FROM tps.map_rm;
jsonb_pretty( INSERT INTO
$$ tps.map_rm
{ SELECT *
"defn": [ FROM
{ (VALUES
"key": "{Description}", ('DCARD', 'First 20',
"field": "ini", $j$
"regex": "([\\w].*?)(?=$| -|\\s[0-9].*?|\\s[\\w/]+?:)" {
}, "defn": [
{ {
"key": "{Description}", "key": "{Description}",
"field": "compn", "field": "f20",
"regex": "Comp Name:(.+?)(?=$| Comp|\\w+?:)" "regex": ".{1,20}"
}, ,"retain":"y"
{ }
"key": "{Description}", ],
"field": "adp_comp", "where": [
"regex": "Cust ID:.*?(B3X|UDV|U7E|U7C|U7H|U7J).*?(?=$|\\w+?:)" {
}, }
{ ]
"key": "{Description}", }
"field": "desc", $j$::jsonb
"regex": "Desc:(.+?) Comp" , 2)
}, ,('HUNT', 'First 20',
{ $j$
"key": "{Description}", {
"field": "discr", "defn": [
"regex": "Discr:(.+?)(?=$| SEC:|\\w+?:)" {
} "key": "{Description}",
], "field": "f20",
"type": "extract", "regex": ".{1,20}"
"where": [ ,"retain":"y"
{ }
"Transaction": "ACH Debits" ],
} "where": [
] {
} }
$$::jsonb ]
) }
$j$::jsonb
, 1)
) x

251
map_rm_template.pgsql Normal file
View File

@ -0,0 +1,251 @@
/*
DELETE FROM tps.map_rm where target = 'Strip Amount Commas';
INSERT INTO
tps.map_rm
SELECT *
FROM
(VALUES
('PNCC', 'Strip Amount Commas',
$j$
{
"name":"Strip Amount Commas",
"description":"the Amount field come from PNC with commas embeded so it cannot be cast to numeric",
"defn": [
{
"key": "{Amount}",
"field": "amount",
"regex": ",",
"replace":"",
"flag":"g",
"retain":"y"
}
],
"function":"replace",
"map":"no",
"where": [
{
}
]
}
$j$::jsonb
, 1)
) x;
DELETE FROM tps.map_rm where target = 'Parse Descr';
INSERT INTO
tps.map_rm
SELECT *
FROM
(VALUES
('PNCC', 'Parse Descr',
$j$
{
"name":"Parse Descr",
"description":"parse the description based on at least three capital letters followed by a comma until another set of at lesat 3 capital letters and a comma is encountered",
"defn": [
{
"key": "{Description}",
"field": "dparse",
"regex": "([A-Z]{3,}?:)(.*)(?=[A-Z]{3,}?:|$)",
"flag":"g",
"retain":"y"
}
],
"function":"extract",
"map":"no",
"where": [
{
}
]
}
$j$::jsonb
, 2)
) x;
DELETE FROM tps.map_rm where target = 'Extract OBI';
INSERT INTO
tps.map_rm
SELECT *
FROM
(VALUES
('PNCC', 'Extract OBI',
$j$
{
"name":"Extract OBI",
"description":"pull out whatever follows OBI in the description until atleast 3 capital letters followed by a colon are encountered",
"defn": [
{
"key": "{Description}",
"field": "obi",
"regex": "OBI:(.*?)(?=[A-Z]{3,}?:|$)",
"flag":"",
"retain":"y"
}
],
"function":"extract",
"map":"no",
"where": [
{
"Transaction":"Money Transfer DB - Wire"
},
{
"Transaction":"Money Transfer CR-Other"
},
{
"Transaction":"Intl Money Transfer Debits"
},
{
"Transaction":"Money Transfer DB - Other"
},
{
"Transaction":"Money Transfer CR-Wire"
}
]
}
$j$::jsonb
, 2)
) x;
DELETE FROM tps.map_rm where target = 'Extract RFB';
INSERT INTO
tps.map_rm
SELECT *
FROM
(VALUES
('PNCC', 'Extract RFB',
$j$
{
"name":"Extract RFB",
"description":"pull out whatever follows RFB in the description until atleast 3 capital letters followed by a colon are encountered",
"defn": [
{
"key": "{Description}",
"field": "rfb",
"regex": "RFB:(.*?)(?=[A-Z]{3,}?:|$)",
"flag":"",
"retain":"y"
}
],
"function":"extract",
"map":"no",
"where": [
{
"Transaction":"Money Transfer DB - Wire"
},
{
"Transaction":"Money Transfer CR-Other"
},
{
"Transaction":"Intl Money Transfer Debits"
},
{
"Transaction":"Money Transfer DB - Other"
},
{
"Transaction":"Money Transfer CR-Wire"
}
]
}
$j$::jsonb
, 2)
) x;
*/
DELETE FROM tps.map_rm where target = 'Parse ACH';
INSERT INTO
tps.map_rm
SELECT *
FROM
(VALUES
('PNCC', 'Parse ACH',
$j$
{
"name":"Parse ACH",
"description":"parse select components of the description for ACH Credits Receieved",
"defn": [
{
"key": "{Description}",
"field":"Comp Name",
"regex": "Comp Name:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Cust ID",
"regex": "Cust ID:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Desc",
"regex": "Desc:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Cust Name",
"regex": "Cust Name:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Batch Discr",
"regex": "Batch Discr:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Comp ID",
"regex": "Comp ID:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Addenda",
"regex": "Addenda:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"SETT",
"regex": "SETT:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Date",
"regex": "Date:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
},
{
"key": "{Description}",
"field":"Time",
"regex": "Time:(.+?)(?=SEC:|Cust ID:|Desc:|Comp Name:|Comp ID:|Batch Discr:|Cust Name:|Addenda:|SETT:|Date:|Time:|$)",
"flag":"",
"retain":"y"
}
],
"function":"extract",
"map":"no",
"where": [
{
"Transaction":"ACH Credits"
},
{
"Transaction":"ACH Debits"
}
]
}
$j$::jsonb
, 2)
) x;

File diff suppressed because one or more lines are too long

View File

@ -4,6 +4,8 @@ Concepts
pull various static files into postgres and do basic transformation without losing the original document pull various static files into postgres and do basic transformation without losing the original document
or getting into custom code for each scenario or getting into custom code for each scenario
the is an in-between for an foreign data wrapper & custom programming
## Storage ## Storage
all records are jsonb all records are jsonb
applied mappings are in associated jsonb documents applied mappings are in associated jsonb documents
@ -15,6 +17,9 @@ applied mappings are in associated jsonb documents
1. regular expressions are used to extract pieces of the json objects 1. regular expressions are used to extract pieces of the json objects
2. the results of the regular expressions are bumped up against a list of basic mappings and written to an associated jsonb document 2. the results of the regular expressions are bumped up against a list of basic mappings and written to an associated jsonb document
a target represents a whole scenario that needs matched. it can contain several regex expressions. if one fails, then no match is attempted because it coudl result in a false positive based on the @> oeprator used at join time
`this probably isn't correctly implemented`
## Transformation tools ## Transformation tools
* `COPY` * `COPY`
* `regexp_matches()` * `regexp_matches()`

8
sqitch.conf Normal file
View File

@ -0,0 +1,8 @@
[core]
engine = pg
# plan_file = sqitch.plan
# top_dir = .
# [engine "pg"]
# target = db:pg:
# registry = sqitch
# client = psql

4
sqitch.plan Normal file
View File

@ -0,0 +1,4 @@
%syntax-version=1.0.0
%project=tps_etl
%uri=https://github.com/fleetside72/tps_etl

View File

@ -1,9 +1,36 @@
\timing
/*--------------------------------------------------------
0. load target import to temp table
1. create pending list
2. get unqiue pending keys
3. see which keys not already in tps.trans
4. insert pending records associated with keys that are not already in trans
5. insert summary to log table
*/---------------------------------------------------------
DO $$ DO $$
DECLARE _t text; DECLARE _t text;
DECLARE _c text; DECLARE _c text;
DECLARE _path text;
DECLARE _srce text;
<<<<<<< HEAD
BEGIN BEGIN
_path := 'C:\users\fleet\downloads\d1026.csv';
_srce := 'DCARD';
=======
DECLARE _log_info text;
DECLARE _log_id text;
BEGIN
_path := 'C:\users\ptrowbridge\downloads\llcol.csv';
_srce := 'PNCL';
>>>>>>> wk
----------------------------------------------------build the column list of the temp table---------------------------------------------------------------- ----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
@ -14,11 +41,11 @@ BEGIN
_t, _t,
_c _c
FROM FROM
TPS.srce tps.srce
--unwrap the schema definition array --unwrap the schema definition array
LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
WHERE WHERE
srce = 'DCARD' srce = _srce
GROUP BY GROUP BY
srce; srce;
@ -35,72 +62,143 @@ BEGIN
----------------------------------------------------do the insert------------------------------------------------------------------------------------------- ----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
--the column list needs to be dynamic forcing this whole line to be dynamic --the column list needs to be dynamic forcing this whole line to be dynamic
_t := format('COPY csv_i (%s) FROM ''C:\Users\fleet\downloads\dfs.csv'' WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c); _t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
--RAISE NOTICE '%', _t; --RAISE NOTICE '%', _t;
EXECUTE _t; EXECUTE _t;
WITH
-------------extract the limiter fields to one row per source----------------------------------
ext AS (
SELECT
srce
,defn->'unique_constraint'->>'fields'
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
FROM
tps.srce
WHERE
srce = _srce
--add where clause for targeted source
)
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
,pending_list AS (
SELECT
jsonb_extract(
row_to_json(i)::jsonb
,ext.text_array
) json_key,
row_to_json(i)::JSONB rec,
srce,
--ae.rn,
id
FROM
csv_i i
INNER JOIN ext ON
ext.srce = _srce
ORDER BY
id ASC
)
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
, pending_keys AS (
SELECT DISTINCT
json_key
FROM
pending_list
)
-----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
, matched_keys AS (
SELECT DISTINCT
k.json_key
FROM
pending_keys k
INNER JOIN tps.trans t ON
t.rec @> k.json_key
)
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
, unmatched_keys AS (
SELECT
json_key
FROM
pending_keys
EXCEPT
SELECT
json_key
FROM
matched_keys
)
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
--need to look into mapping the transactions prior to loading
, inserted AS (
INSERT INTO
tps.trans (srce, rec)
SELECT
pl.srce
,pl.rec
FROM
pending_list pl
INNER JOIN unmatched_keys u ON
u.json_key = pl.json_key
ORDER BY
pl.id ASC
----this conflict is only if an exact duplicate rec json happens, which will be rejected
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
RETURNING *
)
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
, logged AS (
INSERT INTO
tps.trans_log (info)
SELECT
JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
||JSONB_BUILD_OBJECT('srce',_srce)
||JSONB_BUILD_OBJECT('path',_path)
||JSONB_BUILD_OBJECT('not_inserted',
(
SELECT
jsonb_agg(json_key)
FROM
matched_keys
)
)
||JSONB_BUILD_OBJECT('inserted',
(
SELECT
jsonb_agg(json_key)
FROM
unmatched_keys
)
)
RETURNING *
)
SELECT
id
,info
INTO
_log_id
,_log_info
FROM
logged;
RAISE NOTICE 'import logged under id# %, info: %', _log_id, _log_info;
END END
$$; $$;
--*******************************************
--this needs to aggregate on id sequence
--*******************************************
WITH pending_list AS (
SELECT
---creates a key value pair and then aggregates rows of key value pairs
jsonb_object_agg(
(ae.e::text[])[1], --the key name
(row_to_json(i)::jsonb) #> ae.e::text[] --get the target value from the key from the csv row that has been converted to json
) json_key,
row_to_json(i)::JSONB - 'id' rec,
srce,
--ae.rn,
id
FROM
csv_i i
INNER JOIN tps.srce s ON
s.srce = 'DCARD'
LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE
GROUP BY
i.*,
srce,
id
ORDER BY
id
)
------results of an insert operation--------------
, inserted AS (
INSERT INTO
tps.trans (srce, rec)
SELECT
pl.srce
,pl.rec
FROM
pending_list pl
LEFT JOIN tps.trans t ON
t.srce = pl.srce
AND t.rec @> pl.json_key
WHERE
t IS NULL
----this conflict is only if an exact duplicate rec json happens, which will be rejected
----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
RETURNING *
)
----records not inserted------
SELECT
srce
,rec
FROM
pending_list
EXCEPT ALL
SELECT
srce
,rec
FROM
inserted;

24
srce_defn.pgsql Normal file
View File

@ -0,0 +1,24 @@
\timing
/*
WITH
ext AS (
SELECT
srce
,defn->'unique_constraint'->>'fields'
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
FROM
tps.srce
--add where clause for targeted source
)
*/
SELECT COUNT(*) FROM
(
SELECT DISTINCT
t.srce
,(SELECT JSONB_OBJECT_agg(ae.e,rec #> ae.e::text[]) FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) ja
FROM
tps.trans t
INNER JOIN tps.srce s ON
s.srce = t.srce
) X

39
srce_template.pgsql Normal file
View File

@ -0,0 +1,39 @@
insert into tps.srce
SELECT
'CAMZ',
$$
{
"name": "CAMZ",
"description":"Chase Amazon Credit Card",
"type": "csv",
"schema": [
{
"key": "Type",
"type": "text"
},
{
"key": "Trans Date",
"type": "date"
},
{
"key": "Post Date",
"type": "date"
},
{
"key": "Description",
"type": "text"
},
{
"key": "Amount",
"type": "numeric"
}
],
"unique_constraint": {
"type": "key",
"fields": [
"{Trans Date}"
,"{Post Date}"
]
}
}
$$::JSONB

19
srce_unq.pgsql Normal file
View File

@ -0,0 +1,19 @@
WITH
ext AS (
SELECT
srce
,defn->'unique_constraint'->>'fields'
,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) txa
FROM
tps.srce
)
SELECT
t.srce
,jsonb_pretty(t.rec)
,jsonb_pretty(public.jsonb_extract(rec,txa))
FROM
tps.trans t
INNER JOIN ext ON
t.srce = ext.srce

16
trans_log_template.pgsql Normal file
View File

@ -0,0 +1,16 @@
SELECT
jsonb_pretty(
$$
{
"path":"C:\\users\\ptrowbridge\\downloads\\transsearchcsv.csv"
,"srce":"PNCC"
,"stamp":"2017-10-24 08:32:06.599067-04"
,"inserted":{
"keys":[
1,2,3,4,5,6,7
]
,"summary":""
}
}
$$::jsonb
)

View File

@ -11,4 +11,7 @@ FROM
LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE
GROUP BY GROUP BY
t.srce t.srce
,(ae.e::text[])[1] ,(ae.e::text[])[1]
ORDER BY
t.srce
,(ae.e::text[])[1]

File diff suppressed because it is too large Load Diff

View File

@ -14,6 +14,13 @@ SET check_function_bodies = false;
SET client_min_messages = warning; SET client_min_messages = warning;
SET row_security = off; SET row_security = off;
--
-- Name: bank; Type: SCHEMA; Schema: -; Owner: -
--
CREATE SCHEMA bank;
-- --
-- Name: evt; Type: SCHEMA; Schema: -; Owner: - -- Name: evt; Type: SCHEMA; Schema: -; Owner: -
-- --
@ -56,8 +63,56 @@ CREATE EXTENSION IF NOT EXISTS plpgsql WITH SCHEMA pg_catalog;
COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language'; COMMENT ON EXTENSION plpgsql IS 'PL/pgSQL procedural language';
SET search_path = bank, pg_catalog;
--
-- Name: pncc; Type: TYPE; Schema: bank; Owner: -
--
CREATE TYPE pncc AS (
"AsOfDate" date,
"BankId" text,
"AccountNumber" text,
"AccountName" text,
"BaiControl" text,
"Currency" text,
"Transaction" text,
"Reference" text,
"Amount" numeric,
"Description" text,
"AdditionalRemittance" text
);
SET search_path = tps, pg_catalog; SET search_path = tps, pg_catalog;
--
-- Name: dcard; Type: TYPE; Schema: tps; Owner: -
--
CREATE TYPE dcard AS (
"Trans. Date" date,
"Post Date" date,
"Description" text,
"Amount" numeric,
"Category" text
);
--
-- Name: hunt; Type: TYPE; Schema: tps; Owner: -
--
CREATE TYPE hunt AS (
"Date" date,
"Reference Number" numeric,
"Payee Name" text,
"Memo" text,
"Amount" text,
"Category Name" text
);
-- --
-- Name: srce_defn_schema; Type: TYPE; Schema: tps; Owner: - -- Name: srce_defn_schema; Type: TYPE; Schema: tps; Owner: -
-- --
@ -68,6 +123,58 @@ CREATE TYPE srce_defn_schema AS (
); );
SET search_path = public, pg_catalog;
--
-- Name: jsonb_extract(jsonb, text[]); Type: FUNCTION; Schema: public; Owner: -
--
CREATE FUNCTION jsonb_extract(rec jsonb, key_list text[]) RETURNS jsonb
LANGUAGE plpgsql
AS $$
DECLARE
t text[];
j jsonb := '{}'::jsonb;
BEGIN
FOREACH t SLICE 1 IN ARRAY key_list LOOP
--RAISE NOTICE '%', t;
--RAISE NOTICE '%', t[1];
j := j || jsonb_build_object(t[1],rec#>t);
END LOOP;
RETURN j;
END;
$$;
SET search_path = tps, pg_catalog;
--
-- Name: jsonb_concat(jsonb, jsonb); Type: FUNCTION; Schema: tps; Owner: -
--
CREATE FUNCTION jsonb_concat(state jsonb, concat jsonb) RETURNS jsonb
LANGUAGE plpgsql
AS $$
BEGIN
--RAISE notice 'state is %', state;
--RAISE notice 'concat is %', concat;
RETURN state || concat;
END;
$$;
--
-- Name: jsonb_concat_obj(jsonb); Type: AGGREGATE; Schema: tps; Owner: -
--
CREATE AGGREGATE jsonb_concat_obj(jsonb) (
SFUNC = jsonb_concat,
STYPE = jsonb,
INITCOND = '{}'
);
SET search_path = evt, pg_catalog; SET search_path = evt, pg_catalog;
SET default_tablespace = ''; SET default_tablespace = '';
@ -100,6 +207,30 @@ ALTER TABLE log ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SET search_path = tps, pg_catalog; SET search_path = tps, pg_catalog;
--
-- Name: map_rm; Type: TABLE; Schema: tps; Owner: -
--
CREATE TABLE map_rm (
srce text NOT NULL,
target text NOT NULL,
regex jsonb,
seq integer NOT NULL
);
--
-- Name: map_rv; Type: TABLE; Schema: tps; Owner: -
--
CREATE TABLE map_rv (
srce text NOT NULL,
target text NOT NULL,
retval jsonb NOT NULL,
map jsonb
);
-- --
-- Name: srce; Type: TABLE; Schema: tps; Owner: - -- Name: srce; Type: TABLE; Schema: tps; Owner: -
-- --
@ -118,7 +249,9 @@ CREATE TABLE trans (
id integer NOT NULL, id integer NOT NULL,
srce text, srce text,
rec jsonb, rec jsonb,
map jsonb parse jsonb,
map jsonb,
allj jsonb
); );
@ -136,6 +269,30 @@ ALTER TABLE trans ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
); );
--
-- Name: trans_log; Type: TABLE; Schema: tps; Owner: -
--
CREATE TABLE trans_log (
id integer NOT NULL,
info jsonb
);
--
-- Name: trans_log_id_seq; Type: SEQUENCE; Schema: tps; Owner: -
--
ALTER TABLE trans_log ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME trans_log_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
SET search_path = evt, pg_catalog; SET search_path = evt, pg_catalog;
-- --
@ -148,6 +305,22 @@ ALTER TABLE ONLY log
SET search_path = tps, pg_catalog; SET search_path = tps, pg_catalog;
--
-- Name: map_rm map_rm_pk; Type: CONSTRAINT; Schema: tps; Owner: -
--
ALTER TABLE ONLY map_rm
ADD CONSTRAINT map_rm_pk PRIMARY KEY (srce, target);
--
-- Name: map_rv map_rv_pk; Type: CONSTRAINT; Schema: tps; Owner: -
--
ALTER TABLE ONLY map_rv
ADD CONSTRAINT map_rv_pk PRIMARY KEY (srce, target, retval);
-- --
-- Name: srce srce_pkey; Type: CONSTRAINT; Schema: tps; Owner: - -- Name: srce srce_pkey; Type: CONSTRAINT; Schema: tps; Owner: -
-- --
@ -156,6 +329,14 @@ ALTER TABLE ONLY srce
ADD CONSTRAINT srce_pkey PRIMARY KEY (srce); ADD CONSTRAINT srce_pkey PRIMARY KEY (srce);
--
-- Name: trans_log trans_log_pkey; Type: CONSTRAINT; Schema: tps; Owner: -
--
ALTER TABLE ONLY trans_log
ADD CONSTRAINT trans_log_pkey PRIMARY KEY (id);
-- --
-- Name: trans trans_pkey; Type: CONSTRAINT; Schema: tps; Owner: - -- Name: trans trans_pkey; Type: CONSTRAINT; Schema: tps; Owner: -
-- --
@ -164,6 +345,36 @@ ALTER TABLE ONLY trans
ADD CONSTRAINT trans_pkey PRIMARY KEY (id); ADD CONSTRAINT trans_pkey PRIMARY KEY (id);
--
-- Name: trans_allj; Type: INDEX; Schema: tps; Owner: -
--
CREATE INDEX trans_allj ON trans USING gin (allj);
--
-- Name: trans_rec; Type: INDEX; Schema: tps; Owner: -
--
CREATE INDEX trans_rec ON trans USING gin (rec);
--
-- Name: map_rm map_rm_fk_srce; Type: FK CONSTRAINT; Schema: tps; Owner: -
--
ALTER TABLE ONLY map_rm
ADD CONSTRAINT map_rm_fk_srce FOREIGN KEY (srce) REFERENCES srce(srce);
--
-- Name: map_rv map_rv_fk_rm; Type: FK CONSTRAINT; Schema: tps; Owner: -
--
ALTER TABLE ONLY map_rv
ADD CONSTRAINT map_rv_fk_rm FOREIGN KEY (srce, target) REFERENCES map_rm(srce, target);
-- --
-- Name: trans trans_srce_fkey; Type: FK CONSTRAINT; Schema: tps; Owner: - -- Name: trans trans_srce_fkey; Type: FK CONSTRAINT; Schema: tps; Owner: -
-- --