dataflow/database/functions.sql
Paul Trowbridge dcac6def87 Unify mappings UI around single SQL query with full UX improvements
- Add get_all_values() SQL function returning all extracted values (mapped
  + unmapped) with real record counts and mapping output in one query
- Add /mappings/source/:source/all-values API endpoint
- Rewrite All tab to use get_all_values directly instead of merging two
  separate API calls; counts now populated for all rows
- Rewrite export.tsv to use get_all_values (real counts for mapped rows)
- Fix save bug where editing one output field blanked unedited fields by
  merging drafts over existing mapping output instead of replacing
- Add dirty row highlighting (blue tint) and per-rule Save All button
- Fix sort instability during editing by sorting on committed values only

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 09:58:09 -04:00

443 lines
15 KiB
PL/PgSQL

--
-- Dataflow Functions
-- Simple, clear functions for import and transformation
--
SET search_path TO dataflow, public;
------------------------------------------------------
-- Function: import_records
-- Import data with automatic deduplication
------------------------------------------------------
CREATE OR REPLACE FUNCTION import_records(
p_source_name TEXT,
p_data JSONB -- Array of records
) RETURNS JSON AS $$
DECLARE
v_dedup_fields TEXT[];
v_record JSONB;
v_dedup_key TEXT;
v_inserted INTEGER := 0;
v_duplicates INTEGER := 0;
v_log_id INTEGER;
BEGIN
-- Get dedup fields for this source
SELECT dedup_fields INTO v_dedup_fields
FROM dataflow.sources
WHERE name = p_source_name;
IF v_dedup_fields IS NULL THEN
RETURN json_build_object(
'success', false,
'error', 'Source not found: ' || p_source_name
);
END IF;
-- Process each record
FOR v_record IN SELECT * FROM jsonb_array_elements(p_data)
LOOP
-- Generate dedup key
v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields);
-- Try to insert (will fail silently if duplicate)
BEGIN
INSERT INTO dataflow.records (source_name, data, dedup_key)
VALUES (p_source_name, v_record, v_dedup_key);
v_inserted := v_inserted + 1;
EXCEPTION WHEN unique_violation THEN
v_duplicates := v_duplicates + 1;
END;
END LOOP;
-- Log the import
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate)
VALUES (p_source_name, v_inserted, v_duplicates)
RETURNING id INTO v_log_id;
RETURN json_build_object(
'success', true,
'imported', v_inserted,
'duplicates', v_duplicates,
'log_id', v_log_id
);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
------------------------------------------------------
-- Aggregate: jsonb_concat_obj
-- Merge JSONB objects across rows (later rows win on key conflicts)
-- Usage: jsonb_concat_obj(col ORDER BY sequence)
------------------------------------------------------
CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB)
RETURNS JSONB AS $$
SELECT COALESCE(a, '{}') || COALESCE(b, '{}')
$$ LANGUAGE sql IMMUTABLE;
DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB);
CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) (
sfunc = dataflow.jsonb_merge,
stype = JSONB,
initcond = '{}'
);
------------------------------------------------------
-- Function: apply_transformations
-- Apply all transformation rules to records (set-based)
------------------------------------------------------
DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]);
CREATE OR REPLACE FUNCTION apply_transformations(
p_source_name TEXT,
p_record_ids INTEGER[] DEFAULT NULL, -- NULL = all eligible records
p_overwrite BOOLEAN DEFAULT FALSE -- FALSE = skip already-transformed, TRUE = overwrite all
) RETURNS JSON AS $$
WITH
-- All records to process
qualifying AS (
SELECT id, data
FROM dataflow.records
WHERE source_name = p_source_name
AND (p_overwrite OR transformed IS NULL)
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
),
-- Mirror TPS rx: fan out one row per regex match, drive from rules → records
rx AS (
SELECT
q.id,
r.name AS rule_name,
r.sequence,
r.output_field,
r.retain,
r.function_type,
COALESCE(mt.rn, rp.rn, 1) AS result_number,
-- extract: build map_val and retain_val per match (mirrors TPS)
CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val,
to_jsonb(rp.rp) AS replace_val
FROM dataflow.rules r
INNER JOIN qualifying q ON q.data ? r.field
LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags)
WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract'
LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags)
WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace'
WHERE r.source_name = p_source_name
AND r.enabled = true
),
-- Aggregate match rows back into one value per (record, rule) — mirrors TPS agg_to_target_items
agg_matches AS (
SELECT
id,
rule_name,
sequence,
output_field,
retain,
function_type,
CASE function_type
WHEN 'replace' THEN jsonb_agg(replace_val) -> 0
ELSE
CASE WHEN max(result_number) = 1
THEN jsonb_agg(match_val ORDER BY result_number) -> 0
ELSE jsonb_agg(match_val ORDER BY result_number)
END
END AS extracted
FROM rx
GROUP BY id, rule_name, sequence, output_field, retain, function_type
),
-- Join with mappings to find mapped output — mirrors TPS link_map
linked AS (
SELECT
a.id,
a.sequence,
a.output_field,
a.retain,
a.extracted,
m.output AS mapped
FROM agg_matches a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted
WHERE a.extracted IS NOT NULL
),
-- Build per-rule output JSONB:
-- mapped → use mapping output; also write output_field if retain = true
-- no map → write extracted value to output_field
rule_output AS (
SELECT
id,
sequence,
CASE
WHEN mapped IS NOT NULL THEN
mapped ||
CASE WHEN retain
THEN jsonb_build_object(output_field, extracted)
ELSE '{}'::jsonb
END
ELSE
jsonb_build_object(output_field, extracted)
END AS output
FROM linked
),
-- Merge all rule outputs per record in sequence order — mirrors TPS agg_to_id
record_additions AS (
SELECT
id,
dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions
FROM rule_output
GROUP BY id
),
-- Update all qualifying records; records with no rule matches get transformed = data
updated AS (
UPDATE dataflow.records rec
SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb),
transformed_at = CURRENT_TIMESTAMP
FROM qualifying q
LEFT JOIN record_additions ra ON ra.id = q.id
WHERE rec.id = q.id
RETURNING rec.id
)
SELECT json_build_object('success', true, 'transformed', count(*))
FROM updated
$$ LANGUAGE sql;
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)';
------------------------------------------------------
-- Function: get_all_values
-- All extracted values (mapped + unmapped) with counts and mapping output
------------------------------------------------------
DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT);
CREATE FUNCTION get_all_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB,
mapping_id INTEGER,
output JSONB,
is_mapped BOOLEAN
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed->r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed->r.output_field
ORDER BY rec.id
) AS rn
FROM dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE
rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
),
aggregated AS (
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
)
SELECT
a.rule_name,
a.output_field,
a.source_field,
a.extracted_value,
a.record_count,
a.sample,
m.id AS mapping_id,
m.output,
(m.id IS NOT NULL) AS is_mapped
FROM aggregated a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted_value
ORDER BY a.record_count DESC;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION get_all_values IS 'All extracted values with record counts and mapping output (single query for All tab)';
------------------------------------------------------
-- Function: get_unmapped_values
-- Find extracted values that need mappings
------------------------------------------------------
DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT);
CREATE FUNCTION get_unmapped_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed->r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed->r.output_field
ORDER BY rec.id
) AS rn
FROM
dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE
rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
)
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
WHERE NOT EXISTS (
SELECT 1 FROM dataflow.mappings m
WHERE m.source_name = p_source_name
AND m.rule_name = e.rule_name
AND m.input_value = e.extracted_value
)
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
ORDER BY count(*) DESC;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
------------------------------------------------------
-- Function: reprocess_records
-- Clear and reapply transformations
------------------------------------------------------
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
RETURNS JSON AS $$
-- Overwrite all records directly — no clear step, mirrors TPS srce_map_overwrite
SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE)
$$ LANGUAGE sql;
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
------------------------------------------------------
-- Function: generate_source_view
-- Build a typed flat view in dfv schema
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
RETURNS JSON AS $$
DECLARE
v_config JSONB;
v_fields JSONB;
v_field JSONB;
v_cols TEXT := '';
v_sql TEXT;
v_view TEXT;
BEGIN
SELECT config INTO v_config
FROM dataflow.sources
WHERE name = p_source_name;
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
END IF;
v_fields := v_config->'fields';
FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields)
LOOP
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
IF v_field->>'expression' IS NOT NULL THEN
-- Computed expression: substitute {fieldname} refs with (transformed->>'fieldname')::type
-- e.g. "{Amount} * {sign}" → "(transformed->>'Amount')::numeric * (transformed->>'sign')::numeric"
DECLARE
v_expr TEXT := v_field->>'expression';
v_ref TEXT;
v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric');
BEGIN
WHILE v_expr ~ '\{[^}]+\}' LOOP
v_ref := substring(v_expr FROM '\{([^}]+)\}');
v_expr := replace(v_expr, '{' || v_ref || '}',
format('(transformed->>%L)::numeric', v_ref));
END LOOP;
v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name');
END;
ELSE
CASE v_field->>'type'
WHEN 'date' THEN
v_cols := v_cols || format('(transformed->>%L)::date AS %I',
v_field->>'name', v_field->>'name');
WHEN 'numeric' THEN
v_cols := v_cols || format('(transformed->>%L)::numeric AS %I',
v_field->>'name', v_field->>'name');
ELSE
v_cols := v_cols || format('transformed->>%L AS %I',
v_field->>'name', v_field->>'name');
END CASE;
END IF;
END LOOP;
CREATE SCHEMA IF NOT EXISTS dfv;
v_view := 'dfv.' || quote_ident(p_source_name);
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
v_sql := format(
'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL',
v_view, v_cols, p_source_name
);
EXECUTE v_sql;
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields';
------------------------------------------------------
-- Summary
------------------------------------------------------
-- Functions: 4 simple, focused functions
-- 1. import_records - Import with deduplication
-- 2. apply_transformations - Apply rules and mappings
-- 3. get_unmapped_values - Find values needing mappings
-- 4. reprocess_records - Re-transform all records
--
-- Each function does ONE thing clearly
-- No complex nested CTEs
-- Easy to understand and debug
------------------------------------------------------