finish out logging the inserts and getting them into the main do block
This commit is contained in:
		
							parent
							
								
									e2d42467af
								
							
						
					
					
						commit
						ed1a903718
					
				
							
								
								
									
										153
									
								
								srce.pgsql
									
									
									
									
									
								
							
							
						
						
									
										153
									
								
								srce.pgsql
									
									
									
									
									
								
							@ -6,8 +6,7 @@
 | 
			
		||||
2. get unqiue pending keys
 | 
			
		||||
3. see which keys not already in tps.trans
 | 
			
		||||
4. insert pending records associated with keys that are not already in trans
 | 
			
		||||
5. get list of recors not inserted
 | 
			
		||||
6. summarize records not inserted
 | 
			
		||||
5. insert summary to log table
 | 
			
		||||
*/---------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -15,9 +14,14 @@ DO $$
 | 
			
		||||
 | 
			
		||||
DECLARE _t text;
 | 
			
		||||
DECLARE _c text;
 | 
			
		||||
DECLARE _path text;
 | 
			
		||||
DECLARE _srce text;
 | 
			
		||||
 | 
			
		||||
BEGIN
 | 
			
		||||
 | 
			
		||||
    _path := 'C:\users\fleet\downloads\dc1024.csv';
 | 
			
		||||
    _srce := 'DCARD';
 | 
			
		||||
	
 | 
			
		||||
----------------------------------------------------build the column list of the temp table----------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
	SELECT
 | 
			
		||||
@ -27,11 +31,11 @@ BEGIN
 | 
			
		||||
    	_t, 
 | 
			
		||||
        _c
 | 
			
		||||
    FROM 
 | 
			
		||||
        TPS.srce
 | 
			
		||||
        tps.srce
 | 
			
		||||
        --unwrap the schema definition array
 | 
			
		||||
        LEFT JOIN LATERAL jsonb_populate_recordset(null::tps.srce_defn_schema, defn->'schema') prs ON TRUE
 | 
			
		||||
    WHERE   
 | 
			
		||||
        srce = 'HUNT'
 | 
			
		||||
        srce = _srce
 | 
			
		||||
    GROUP BY
 | 
			
		||||
        srce;
 | 
			
		||||
        
 | 
			
		||||
@ -48,27 +52,35 @@ BEGIN
 | 
			
		||||
----------------------------------------------------do the insert-------------------------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
    --the column list needs to be dynamic forcing this whole line to be dynamic
 | 
			
		||||
    _t := format('COPY csv_i (%s) FROM ''C:\Users\fleet\downloads\hunt.csv'' WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c);
 | 
			
		||||
    _t := format('COPY csv_i (%s) FROM %L WITH (HEADER TRUE,DELIMITER '','', FORMAT CSV, ENCODING ''SQL_ASCII'',QUOTE ''"'');',_c,_path);
 | 
			
		||||
 | 
			
		||||
    --RAISE NOTICE '%', _t;
 | 
			
		||||
 | 
			
		||||
    EXECUTE _t;
 | 
			
		||||
 | 
			
		||||
    WITH 
 | 
			
		||||
 | 
			
		||||
END
 | 
			
		||||
$$;
 | 
			
		||||
    -------------extract the limiter fields to one row per source----------------------------------
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
WITH 
 | 
			
		||||
 | 
			
		||||
-------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
 | 
			
		||||
 | 
			
		||||
pending_list AS (
 | 
			
		||||
    ext AS (
 | 
			
		||||
    SELECT 
 | 
			
		||||
        ---creates a key value pair and then aggregates rows of key value pairs
 | 
			
		||||
        jsonb_object_agg(
 | 
			
		||||
                (ae.e::text[])[1],                                  --the key name
 | 
			
		||||
                (row_to_json(i)::jsonb) #> ae.e::text[]             --get the target value from the key from the csv row that has been converted to json
 | 
			
		||||
        srce
 | 
			
		||||
        ,defn->'unique_constraint'->>'fields'
 | 
			
		||||
        ,ARRAY(SELECT ae.e::text[] FROM jsonb_array_elements_text(defn->'unique_constraint'->'fields') ae(e)) text_array
 | 
			
		||||
    FROM
 | 
			
		||||
        tps.srce
 | 
			
		||||
    WHERE
 | 
			
		||||
        srce = _srce
 | 
			
		||||
        --add where clause for targeted source
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    -------------for each imported row in the COPY table, genereate the json rec, and a column for the json key specified in the srce.defn-----------
 | 
			
		||||
 | 
			
		||||
    ,pending_list AS (
 | 
			
		||||
        SELECT
 | 
			
		||||
            jsonb_extract(
 | 
			
		||||
                    row_to_json(i)::jsonb
 | 
			
		||||
                    ,ext.text_array
 | 
			
		||||
            ) json_key,
 | 
			
		||||
            row_to_json(i)::JSONB rec,
 | 
			
		||||
            srce,
 | 
			
		||||
@ -76,47 +88,52 @@ pending_list AS (
 | 
			
		||||
            id
 | 
			
		||||
        FROM
 | 
			
		||||
            csv_i i
 | 
			
		||||
        INNER JOIN tps.srce s ON
 | 
			
		||||
            s.srce = 'HUNT'
 | 
			
		||||
        LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE
 | 
			
		||||
    GROUP BY
 | 
			
		||||
        i.*,
 | 
			
		||||
        srce,
 | 
			
		||||
        id
 | 
			
		||||
            INNER JOIN ext ON
 | 
			
		||||
                ext.srce = _srce
 | 
			
		||||
        ORDER BY    
 | 
			
		||||
            id ASC
 | 
			
		||||
)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
-----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
 | 
			
		||||
    -----------create a unique list of keys from staged rows------------------------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
, pending_keys AS (
 | 
			
		||||
    , pending_keys AS (
 | 
			
		||||
        SELECT DISTINCT
 | 
			
		||||
            json_key
 | 
			
		||||
        FROM 
 | 
			
		||||
            pending_list
 | 
			
		||||
)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
-----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
 | 
			
		||||
    -----------list of keys already loaded to tps-----------------------------------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
, unmatched_keys AS (
 | 
			
		||||
SELECT
 | 
			
		||||
    json_key
 | 
			
		||||
FROM
 | 
			
		||||
    pending_keys
 | 
			
		||||
 | 
			
		||||
EXCEPT
 | 
			
		||||
 | 
			
		||||
SELECT DISTINCT
 | 
			
		||||
    , matched_keys AS (
 | 
			
		||||
        SELECT DISTINCT
 | 
			
		||||
            k.json_key
 | 
			
		||||
FROM
 | 
			
		||||
        FROM
 | 
			
		||||
            pending_keys k
 | 
			
		||||
            INNER JOIN tps.trans t ON
 | 
			
		||||
                t.rec @> k.json_key
 | 
			
		||||
)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
-----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
 | 
			
		||||
    -----------return unique keys that are not already in tps.trans-----------------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
, inserted AS (
 | 
			
		||||
    , unmatched_keys AS (
 | 
			
		||||
    SELECT
 | 
			
		||||
        json_key
 | 
			
		||||
    FROM
 | 
			
		||||
        pending_keys
 | 
			
		||||
 | 
			
		||||
    EXCEPT
 | 
			
		||||
 | 
			
		||||
    SELECT
 | 
			
		||||
        json_key
 | 
			
		||||
    FROM
 | 
			
		||||
        matched_keys
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    -----------insert pending rows that have key with no trans match-----------------------------------------------------------------------------------
 | 
			
		||||
    --need to look into mapping the transactions prior to loading
 | 
			
		||||
 | 
			
		||||
    , inserted AS (
 | 
			
		||||
        INSERT INTO
 | 
			
		||||
            tps.trans (srce, rec)
 | 
			
		||||
        SELECT
 | 
			
		||||
@ -131,39 +148,33 @@ FROM
 | 
			
		||||
        ----this conflict is only if an exact duplicate rec json happens, which will be rejected
 | 
			
		||||
        ----therefore, records may not be inserted due to ay matches with certain json fields, or if the entire json is a duplicate, reason is not specified
 | 
			
		||||
        RETURNING *
 | 
			
		||||
)
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
-----------list of records not inserted--------------------------------------------------------------------------------------------------------------
 | 
			
		||||
    --------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
, not_inserted AS (
 | 
			
		||||
    INSERT INTO
 | 
			
		||||
        tps.trans_log (info)
 | 
			
		||||
    SELECT
 | 
			
		||||
        srce
 | 
			
		||||
        ,rec
 | 
			
		||||
    FROM
 | 
			
		||||
        pending_list
 | 
			
		||||
 | 
			
		||||
    EXCEPT ALL
 | 
			
		||||
 | 
			
		||||
        JSONB_BUILD_OBJECT('time_stamp',CURRENT_TIMESTAMP)
 | 
			
		||||
        ||JSONB_BUILD_OBJECT('srce',_srce)
 | 
			
		||||
        ||JSONB_BUILD_OBJECT('path',_path)
 | 
			
		||||
        ||JSONB_BUILD_OBJECT('not_inserted',
 | 
			
		||||
            (
 | 
			
		||||
                SELECT 
 | 
			
		||||
        srce
 | 
			
		||||
        ,rec
 | 
			
		||||
                    jsonb_agg(json_key)
 | 
			
		||||
                FROM
 | 
			
		||||
        inserted
 | 
			
		||||
)
 | 
			
		||||
                    matched_keys
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        ||JSONB_BUILD_OBJECT('inserted',
 | 
			
		||||
            (
 | 
			
		||||
                SELECT 
 | 
			
		||||
                    jsonb_agg(json_key)
 | 
			
		||||
                FROM
 | 
			
		||||
                    unmatched_keys
 | 
			
		||||
            )
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
--------summarize records not inserted-------------------+------------------------------------------------------------------------------------------------
 | 
			
		||||
END
 | 
			
		||||
$$;
 | 
			
		||||
 | 
			
		||||
SELECT
 | 
			
		||||
    t.srce
 | 
			
		||||
    ,(ae.e::text[])[1] unq_constr
 | 
			
		||||
    ,MIN(rec #>> ae.e::text[]) min_text
 | 
			
		||||
    ,MAX(rec #>> ae.e::text[]) max_text
 | 
			
		||||
    ,JSONB_PRETTY(JSON_AGG(rec #> ae.e::text[] ORDER BY rec #>> ae.e::text[])::JSONB)
 | 
			
		||||
FROM
 | 
			
		||||
    not_inserted t
 | 
			
		||||
    INNER JOIN tps.srce s ON
 | 
			
		||||
        s.srce = t.srce
 | 
			
		||||
    LEFT JOIN LATERAL JSONB_ARRAY_ELEMENTS_TEXT(defn->'unique_constraint'->'fields') WITH ORDINALITY ae(e, rn) ON TRUE
 | 
			
		||||
GROUP BY
 | 
			
		||||
    t.srce
 | 
			
		||||
    ,(ae.e::text[])[1];
 | 
			
		||||
@ -7,11 +7,13 @@ SELECT
 | 
			
		||||
FROM
 | 
			
		||||
    tps.srce
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
SELECT
 | 
			
		||||
    srce
 | 
			
		||||
    ,
 | 
			
		||||
    public.jsonb_extract(rec,txa)
 | 
			
		||||
    t.srce
 | 
			
		||||
    ,jsonb_pretty(t.rec)
 | 
			
		||||
    ,jsonb_pretty(public.jsonb_extract(rec,txa))
 | 
			
		||||
FROM
 | 
			
		||||
    tps.trans
 | 
			
		||||
    tps.trans t
 | 
			
		||||
    INNER JOIN ext ON
 | 
			
		||||
        trans.srce = ext.srce
 | 
			
		||||
        t.srce = ext.srce
 | 
			
		||||
		Loading…
	
		Reference in New Issue
	
	Block a user