- Add import_id column to records (links each record to its import batch) - import_records() now stores readable dedup field values (not hashes) in info.inserted_keys / info.excluded_keys, and stamps import_id on insert - delete_import() simplified to delete log row; ON DELETE CASCADE removes records - Add get_import_log() and get_all_import_logs() DB functions - Add DELETE /api/sources/:name/import-log/:id endpoint - Add GET /api/sources/import-log global log endpoint - Import route now auto-applies transformations to new records after import - Import page: show ID column, expandable key detail, checkbox delete - New Log page: global view of all imports across sources - Update README API reference and workflow Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
541 lines
19 KiB
PL/PgSQL
541 lines
19 KiB
PL/PgSQL
--
|
|
-- Dataflow Functions
|
|
-- Simple, clear functions for import and transformation
|
|
--
|
|
|
|
SET search_path TO dataflow, public;
|
|
|
|
------------------------------------------------------
|
|
-- Function: import_records
|
|
-- Import data with automatic deduplication
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION import_records(
|
|
p_source_name TEXT,
|
|
p_data JSONB -- Array of records
|
|
) RETURNS JSON AS $$
|
|
DECLARE
|
|
v_dedup_fields TEXT[];
|
|
v_inserted INTEGER;
|
|
v_duplicates INTEGER;
|
|
v_log_id INTEGER;
|
|
BEGIN
|
|
-- Get dedup fields for this source
|
|
SELECT dedup_fields INTO v_dedup_fields
|
|
FROM dataflow.sources
|
|
WHERE name = p_source_name;
|
|
|
|
IF v_dedup_fields IS NULL THEN
|
|
RETURN json_build_object(
|
|
'success', false,
|
|
'error', 'Source not found: ' || p_source_name
|
|
);
|
|
END IF;
|
|
|
|
WITH
|
|
-- All incoming records with their dedup keys and readable field values
|
|
pending AS (
|
|
SELECT
|
|
rec.value AS data,
|
|
rec.ordinality AS seq,
|
|
dataflow.generate_dedup_key(rec.value, v_dedup_fields) AS dedup_key,
|
|
(SELECT jsonb_object_agg(f, rec.value->>f)
|
|
FROM unnest(v_dedup_fields) AS f) AS dedup_values
|
|
FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec
|
|
),
|
|
-- Keys already in the database (excluded) with their readable values
|
|
existing AS (
|
|
SELECT DISTINCT ON (r.dedup_key) r.dedup_key,
|
|
(SELECT jsonb_object_agg(f, r.data->>f)
|
|
FROM unnest(v_dedup_fields) AS f) AS dedup_values
|
|
FROM dataflow.records r
|
|
INNER JOIN pending p ON p.dedup_key = r.dedup_key
|
|
WHERE r.source_name = p_source_name
|
|
),
|
|
-- Keys that are new
|
|
new_keys AS (
|
|
SELECT p.dedup_key, p.dedup_values FROM pending p
|
|
WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.dedup_key = p.dedup_key)
|
|
),
|
|
-- Write the log entry with readable field values instead of hashes
|
|
log_entry AS (
|
|
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info)
|
|
VALUES (
|
|
p_source_name,
|
|
(SELECT count(*) FROM new_keys),
|
|
(SELECT count(*) FROM existing),
|
|
jsonb_build_object(
|
|
'total', jsonb_array_length(p_data),
|
|
'inserted_keys', (SELECT jsonb_agg(dedup_values) FROM new_keys),
|
|
'excluded_keys', (SELECT jsonb_agg(dedup_values) FROM existing)
|
|
)
|
|
)
|
|
RETURNING id, records_imported, records_duplicate
|
|
),
|
|
-- Insert only new records
|
|
inserted AS (
|
|
INSERT INTO dataflow.records (source_name, data, dedup_key, import_id)
|
|
SELECT p_source_name, p.data, p.dedup_key, (SELECT id FROM log_entry)
|
|
FROM pending p
|
|
INNER JOIN new_keys nk ON nk.dedup_key = p.dedup_key
|
|
ORDER BY p.seq
|
|
RETURNING id
|
|
)
|
|
SELECT le.id, le.records_imported, le.records_duplicate
|
|
INTO v_log_id, v_inserted, v_duplicates
|
|
FROM log_entry le;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'imported', v_inserted,
|
|
'duplicates', v_duplicates,
|
|
'log_id', v_log_id
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_import_log
|
|
-- Return import history for a source
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT)
|
|
RETURNS TABLE (
|
|
id INTEGER,
|
|
source_name TEXT,
|
|
records_imported INTEGER,
|
|
records_duplicate INTEGER,
|
|
imported_at TIMESTAMPTZ,
|
|
info JSONB
|
|
) AS $$
|
|
SELECT id, source_name, records_imported, records_duplicate, imported_at, info
|
|
FROM dataflow.import_log
|
|
WHERE source_name = p_source_name
|
|
ORDER BY imported_at DESC;
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION get_import_log IS 'Return import history for a source, newest first, including inserted/excluded key lists';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_all_import_logs
|
|
-- Return import history across all sources
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_all_import_logs()
|
|
RETURNS TABLE (
|
|
id INTEGER,
|
|
source_name TEXT,
|
|
records_imported INTEGER,
|
|
records_duplicate INTEGER,
|
|
imported_at TIMESTAMPTZ,
|
|
info JSONB
|
|
) AS $$
|
|
SELECT id, source_name, records_imported, records_duplicate, imported_at, info
|
|
FROM dataflow.import_log
|
|
ORDER BY imported_at DESC;
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION get_all_import_logs IS 'Return import history across all sources, newest first';
|
|
|
|
------------------------------------------------------
|
|
-- Function: delete_import
|
|
-- Delete all records from a specific import and remove the log entry
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION delete_import(p_log_id INTEGER)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_deleted INTEGER;
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM dataflow.import_log WHERE id = p_log_id) THEN
|
|
RETURN json_build_object('success', false, 'error', 'Import log entry not found');
|
|
END IF;
|
|
|
|
SELECT count(*) INTO v_deleted FROM dataflow.records WHERE import_id = p_log_id;
|
|
|
|
-- Cascade handles deleting records via FK ON DELETE CASCADE
|
|
DELETE FROM dataflow.import_log WHERE id = p_log_id;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'records_deleted', v_deleted,
|
|
'log_id', p_log_id
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION delete_import IS 'Delete all records belonging to an import batch and remove the log entry';
|
|
|
|
------------------------------------------------------
|
|
-- Aggregate: jsonb_concat_obj
|
|
-- Merge JSONB objects across rows (later rows win on key conflicts)
|
|
-- Usage: jsonb_concat_obj(col ORDER BY sequence)
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB)
|
|
RETURNS JSONB AS $$
|
|
SELECT COALESCE(a, '{}') || COALESCE(b, '{}')
|
|
$$ LANGUAGE sql IMMUTABLE;
|
|
|
|
DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB);
|
|
CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) (
|
|
sfunc = dataflow.jsonb_merge,
|
|
stype = JSONB,
|
|
initcond = '{}'
|
|
);
|
|
|
|
------------------------------------------------------
|
|
-- Function: apply_transformations
|
|
-- Apply all transformation rules to records (set-based)
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]);
|
|
CREATE OR REPLACE FUNCTION apply_transformations(
|
|
p_source_name TEXT,
|
|
p_record_ids INTEGER[] DEFAULT NULL, -- NULL = all eligible records
|
|
p_overwrite BOOLEAN DEFAULT FALSE -- FALSE = skip already-transformed, TRUE = overwrite all
|
|
) RETURNS JSON AS $$
|
|
WITH
|
|
-- All records to process
|
|
qualifying AS (
|
|
SELECT id, data
|
|
FROM dataflow.records
|
|
WHERE source_name = p_source_name
|
|
AND (p_overwrite OR transformed IS NULL)
|
|
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
|
|
),
|
|
-- Mirror TPS rx: fan out one row per regex match, drive from rules → records
|
|
rx AS (
|
|
SELECT
|
|
q.id,
|
|
r.name AS rule_name,
|
|
r.sequence,
|
|
r.output_field,
|
|
r.retain,
|
|
r.function_type,
|
|
COALESCE(mt.rn, rp.rn, 1) AS result_number,
|
|
-- extract: build map_val and retain_val per match (mirrors TPS)
|
|
CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val,
|
|
to_jsonb(rp.rp) AS replace_val
|
|
FROM dataflow.rules r
|
|
INNER JOIN qualifying q ON q.data ? r.field
|
|
LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags)
|
|
WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract'
|
|
LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags)
|
|
WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace'
|
|
WHERE r.source_name = p_source_name
|
|
AND r.enabled = true
|
|
),
|
|
-- Aggregate match rows back into one value per (record, rule) — mirrors TPS agg_to_target_items
|
|
agg_matches AS (
|
|
SELECT
|
|
id,
|
|
rule_name,
|
|
sequence,
|
|
output_field,
|
|
retain,
|
|
function_type,
|
|
CASE function_type
|
|
WHEN 'replace' THEN jsonb_agg(replace_val) -> 0
|
|
ELSE
|
|
CASE WHEN max(result_number) = 1
|
|
THEN jsonb_agg(match_val ORDER BY result_number) -> 0
|
|
ELSE jsonb_agg(match_val ORDER BY result_number)
|
|
END
|
|
END AS extracted
|
|
FROM rx
|
|
GROUP BY id, rule_name, sequence, output_field, retain, function_type
|
|
),
|
|
-- Join with mappings to find mapped output — mirrors TPS link_map
|
|
linked AS (
|
|
SELECT
|
|
a.id,
|
|
a.sequence,
|
|
a.output_field,
|
|
a.retain,
|
|
a.extracted,
|
|
m.output AS mapped
|
|
FROM agg_matches a
|
|
LEFT JOIN dataflow.mappings m ON
|
|
m.source_name = p_source_name
|
|
AND m.rule_name = a.rule_name
|
|
AND m.input_value = a.extracted
|
|
WHERE a.extracted IS NOT NULL
|
|
),
|
|
-- Build per-rule output JSONB:
|
|
-- mapped → use mapping output; also write output_field if retain = true
|
|
-- no map → write extracted value to output_field
|
|
rule_output AS (
|
|
SELECT
|
|
id,
|
|
sequence,
|
|
CASE
|
|
WHEN mapped IS NOT NULL THEN
|
|
mapped ||
|
|
CASE WHEN retain
|
|
THEN jsonb_build_object(output_field, extracted)
|
|
ELSE '{}'::jsonb
|
|
END
|
|
ELSE
|
|
jsonb_build_object(output_field, extracted)
|
|
END AS output
|
|
FROM linked
|
|
),
|
|
-- Merge all rule outputs per record in sequence order — mirrors TPS agg_to_id
|
|
record_additions AS (
|
|
SELECT
|
|
id,
|
|
dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions
|
|
FROM rule_output
|
|
GROUP BY id
|
|
),
|
|
-- Update all qualifying records; records with no rule matches get transformed = data
|
|
updated AS (
|
|
UPDATE dataflow.records rec
|
|
SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb),
|
|
transformed_at = CURRENT_TIMESTAMP
|
|
FROM qualifying q
|
|
LEFT JOIN record_additions ra ON ra.id = q.id
|
|
WHERE rec.id = q.id
|
|
RETURNING rec.id
|
|
)
|
|
SELECT json_build_object('success', true, 'transformed', count(*))
|
|
FROM updated
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_all_values
|
|
-- All extracted values (mapped + unmapped) with counts and mapping output
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT);
|
|
CREATE FUNCTION get_all_values(
|
|
p_source_name TEXT,
|
|
p_rule_name TEXT DEFAULT NULL
|
|
) RETURNS TABLE (
|
|
rule_name TEXT,
|
|
output_field TEXT,
|
|
source_field TEXT,
|
|
extracted_value JSONB,
|
|
record_count BIGINT,
|
|
sample JSONB,
|
|
mapping_id INTEGER,
|
|
output JSONB,
|
|
is_mapped BOOLEAN
|
|
) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
WITH extracted AS (
|
|
SELECT
|
|
r.name AS rule_name,
|
|
r.output_field,
|
|
r.field AS source_field,
|
|
rec.transformed->r.output_field AS extracted_value,
|
|
rec.data AS record_data,
|
|
row_number() OVER (
|
|
PARTITION BY r.name, rec.transformed->r.output_field
|
|
ORDER BY rec.id
|
|
) AS rn
|
|
FROM dataflow.records rec
|
|
CROSS JOIN dataflow.rules r
|
|
WHERE
|
|
rec.source_name = p_source_name
|
|
AND r.source_name = p_source_name
|
|
AND rec.transformed IS NOT NULL
|
|
AND rec.transformed ? r.output_field
|
|
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
|
AND rec.data ? r.field
|
|
),
|
|
aggregated AS (
|
|
SELECT
|
|
e.rule_name,
|
|
e.output_field,
|
|
e.source_field,
|
|
e.extracted_value,
|
|
count(*) AS record_count,
|
|
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
|
|
FROM extracted e
|
|
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
|
|
)
|
|
SELECT
|
|
a.rule_name,
|
|
a.output_field,
|
|
a.source_field,
|
|
a.extracted_value,
|
|
a.record_count,
|
|
a.sample,
|
|
m.id AS mapping_id,
|
|
m.output,
|
|
(m.id IS NOT NULL) AS is_mapped
|
|
FROM aggregated a
|
|
LEFT JOIN dataflow.mappings m ON
|
|
m.source_name = p_source_name
|
|
AND m.rule_name = a.rule_name
|
|
AND m.input_value = a.extracted_value
|
|
ORDER BY a.record_count DESC;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION get_all_values IS 'All extracted values with record counts and mapping output (single query for All tab)';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_unmapped_values
|
|
-- Find extracted values that need mappings
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT);
|
|
CREATE FUNCTION get_unmapped_values(
|
|
p_source_name TEXT,
|
|
p_rule_name TEXT DEFAULT NULL
|
|
) RETURNS TABLE (
|
|
rule_name TEXT,
|
|
output_field TEXT,
|
|
source_field TEXT,
|
|
extracted_value JSONB,
|
|
record_count BIGINT,
|
|
sample JSONB
|
|
) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
WITH extracted AS (
|
|
SELECT
|
|
r.name AS rule_name,
|
|
r.output_field,
|
|
r.field AS source_field,
|
|
rec.transformed->r.output_field AS extracted_value,
|
|
rec.data AS record_data,
|
|
row_number() OVER (
|
|
PARTITION BY r.name, rec.transformed->r.output_field
|
|
ORDER BY rec.id
|
|
) AS rn
|
|
FROM
|
|
dataflow.records rec
|
|
CROSS JOIN dataflow.rules r
|
|
WHERE
|
|
rec.source_name = p_source_name
|
|
AND r.source_name = p_source_name
|
|
AND rec.transformed IS NOT NULL
|
|
AND rec.transformed ? r.output_field
|
|
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
|
AND rec.data ? r.field
|
|
)
|
|
SELECT
|
|
e.rule_name,
|
|
e.output_field,
|
|
e.source_field,
|
|
e.extracted_value,
|
|
count(*) AS record_count,
|
|
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
|
|
FROM extracted e
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM dataflow.mappings m
|
|
WHERE m.source_name = p_source_name
|
|
AND m.rule_name = e.rule_name
|
|
AND m.input_value = e.extracted_value
|
|
)
|
|
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
|
|
ORDER BY count(*) DESC;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
|
|
|
|
------------------------------------------------------
|
|
-- Function: reprocess_records
|
|
-- Clear and reapply transformations
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
|
|
RETURNS JSON AS $$
|
|
-- Overwrite all records directly — no clear step, mirrors TPS srce_map_overwrite
|
|
SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE)
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
|
|
|
|
------------------------------------------------------
|
|
-- Function: generate_source_view
|
|
-- Build a typed flat view in dfv schema
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_config JSONB;
|
|
v_fields JSONB;
|
|
v_field JSONB;
|
|
v_cols TEXT := '';
|
|
v_sql TEXT;
|
|
v_view TEXT;
|
|
BEGIN
|
|
SELECT config INTO v_config
|
|
FROM dataflow.sources
|
|
WHERE name = p_source_name;
|
|
|
|
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
|
|
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
|
|
END IF;
|
|
|
|
v_fields := v_config->'fields';
|
|
|
|
FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields)
|
|
LOOP
|
|
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
|
|
|
|
IF v_field->>'expression' IS NOT NULL THEN
|
|
-- Computed expression: substitute {fieldname} refs with (transformed->>'fieldname')::type
|
|
-- e.g. "{Amount} * {sign}" → "(transformed->>'Amount')::numeric * (transformed->>'sign')::numeric"
|
|
DECLARE
|
|
v_expr TEXT := v_field->>'expression';
|
|
v_ref TEXT;
|
|
v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric');
|
|
BEGIN
|
|
WHILE v_expr ~ '\{[^}]+\}' LOOP
|
|
v_ref := substring(v_expr FROM '\{([^}]+)\}');
|
|
v_expr := replace(v_expr, '{' || v_ref || '}',
|
|
format('(transformed->>%L)::numeric', v_ref));
|
|
END LOOP;
|
|
v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name');
|
|
END;
|
|
ELSE
|
|
CASE v_field->>'type'
|
|
WHEN 'date' THEN
|
|
v_cols := v_cols || format('(transformed->>%L)::date AS %I',
|
|
v_field->>'name', v_field->>'name');
|
|
WHEN 'numeric' THEN
|
|
v_cols := v_cols || format('(transformed->>%L)::numeric AS %I',
|
|
v_field->>'name', v_field->>'name');
|
|
ELSE
|
|
v_cols := v_cols || format('transformed->>%L AS %I',
|
|
v_field->>'name', v_field->>'name');
|
|
END CASE;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
CREATE SCHEMA IF NOT EXISTS dfv;
|
|
|
|
v_view := 'dfv.' || quote_ident(p_source_name);
|
|
|
|
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
|
|
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL',
|
|
v_view, v_cols, p_source_name
|
|
);
|
|
|
|
EXECUTE v_sql;
|
|
|
|
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields';
|
|
|
|
------------------------------------------------------
|
|
-- Summary
|
|
------------------------------------------------------
|
|
-- Functions: 4 simple, focused functions
|
|
-- 1. import_records - Import with deduplication
|
|
-- 2. apply_transformations - Apply rules and mappings
|
|
-- 3. get_unmapped_values - Find values needing mappings
|
|
-- 4. reprocess_records - Re-transform all records
|
|
--
|
|
-- Each function does ONE thing clearly
|
|
-- No complex nested CTEs
|
|
-- Easy to understand and debug
|
|
------------------------------------------------------
|