-- -- Dataflow Functions -- Simple, clear functions for import and transformation -- SET search_path TO dataflow, public; ------------------------------------------------------ -- Function: import_records -- Import data with automatic deduplication ------------------------------------------------------ CREATE OR REPLACE FUNCTION import_records( p_source_name TEXT, p_data JSONB -- Array of records ) RETURNS JSON AS $$ DECLARE v_dedup_fields TEXT[]; v_record JSONB; v_dedup_key TEXT; v_inserted INTEGER := 0; v_duplicates INTEGER := 0; v_log_id INTEGER; BEGIN -- Get dedup fields for this source SELECT dedup_fields INTO v_dedup_fields FROM dataflow.sources WHERE name = p_source_name; IF v_dedup_fields IS NULL THEN RETURN json_build_object( 'success', false, 'error', 'Source not found: ' || p_source_name ); END IF; -- Process each record FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP -- Generate dedup key v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); -- Try to insert (will fail silently if duplicate) BEGIN INSERT INTO dataflow.records (source_name, data, dedup_key) VALUES (p_source_name, v_record, v_dedup_key); v_inserted := v_inserted + 1; EXCEPTION WHEN unique_violation THEN v_duplicates := v_duplicates + 1; END; END LOOP; -- Log the import INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) VALUES (p_source_name, v_inserted, v_duplicates) RETURNING id INTO v_log_id; RETURN json_build_object( 'success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id ); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication'; ------------------------------------------------------ -- Aggregate: jsonb_concat_obj -- Merge JSONB objects across rows (later rows win on key conflicts) -- Usage: jsonb_concat_obj(col ORDER BY sequence) ------------------------------------------------------ CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB) RETURNS JSONB AS $$ SELECT COALESCE(a, '{}') || COALESCE(b, '{}') $$ LANGUAGE sql IMMUTABLE; DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB); CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) ( sfunc = dataflow.jsonb_merge, stype = JSONB, initcond = '{}' ); ------------------------------------------------------ -- Function: apply_transformations -- Apply all transformation rules to records (set-based) ------------------------------------------------------ CREATE OR REPLACE FUNCTION apply_transformations( p_source_name TEXT, p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed ) RETURNS JSON AS $$ WITH -- All records to process qualifying AS ( SELECT id, data FROM dataflow.records WHERE source_name = p_source_name AND transformed IS NULL AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) ), -- Apply each enabled rule to each qualifying record that has the required field rx AS ( SELECT q.id, r.name AS rule_name, r.sequence, r.output_field, r.retain, CASE r.function_type WHEN 'replace' THEN to_jsonb(regexp_replace( q.data ->> r.field, r.pattern, r.replace_value, r.flags )) ELSE -- extract: aggregate all matches; single match → scalar, multiple → array -- Aggregate first so we can inspect count and first element cleanly (SELECT CASE WHEN cnt = 0 THEN NULL WHEN cnt = 1 THEN agg->0 ELSE agg END FROM ( SELECT count(*) AS cnt, jsonb_agg( CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) ELSE to_jsonb(mt) END ORDER BY rn ) AS agg FROM regexp_matches(q.data ->> r.field, r.pattern, r.flags) WITH ORDINALITY AS m(mt, rn) ) _agg) END AS extracted FROM qualifying q CROSS JOIN dataflow.rules r WHERE r.source_name = p_source_name AND r.enabled = true AND q.data ? r.field ), -- Join with mappings to find mapped output for each extracted value linked AS ( SELECT rx.id, rx.sequence, rx.output_field, rx.retain, rx.extracted, m.output AS mapped FROM rx LEFT JOIN dataflow.mappings m ON m.source_name = p_source_name AND m.rule_name = rx.rule_name AND m.input_value = rx.extracted WHERE rx.extracted IS NOT NULL ), -- Build per-rule output JSONB: -- mapped → use mapping output; also write output_field if retain = true -- no map → write extracted value to output_field rule_output AS ( SELECT id, sequence, CASE WHEN mapped IS NOT NULL THEN mapped || CASE WHEN retain THEN jsonb_build_object(output_field, extracted) ELSE '{}'::jsonb END ELSE jsonb_build_object(output_field, extracted) END AS output FROM linked ), -- Merge all rule outputs per record in sequence order (higher sequence wins on conflict) record_additions AS ( SELECT id, dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions FROM rule_output GROUP BY id ), -- Update all qualifying records; records with no rule matches get transformed = data updated AS ( UPDATE dataflow.records rec SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb), transformed_at = CURRENT_TIMESTAMP FROM qualifying q LEFT JOIN record_additions ra ON ra.id = q.id WHERE rec.id = q.id RETURNING rec.id ) SELECT json_build_object('success', true, 'transformed', count(*)) FROM updated $$ LANGUAGE sql; COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)'; ------------------------------------------------------ -- Function: get_unmapped_values -- Find extracted values that need mappings ------------------------------------------------------ DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT); CREATE FUNCTION get_unmapped_values( p_source_name TEXT, p_rule_name TEXT DEFAULT NULL ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, source_field TEXT, extracted_value JSONB, record_count BIGINT, sample JSONB ) AS $$ BEGIN RETURN QUERY WITH extracted AS ( SELECT r.name AS rule_name, r.output_field, r.field AS source_field, rec.transformed->r.output_field AS extracted_value, rec.data->>r.field AS source_value FROM dataflow.records rec CROSS JOIN dataflow.rules r WHERE rec.source_name = p_source_name AND r.source_name = p_source_name AND rec.transformed IS NOT NULL AND rec.transformed ? r.output_field AND (p_rule_name IS NULL OR r.name = p_rule_name) AND rec.data ? r.field ) SELECT e.rule_name, e.output_field, e.source_field, e.extracted_value, count(*) AS record_count, jsonb_agg(DISTINCT e.source_value) FILTER (WHERE e.source_value IS NOT NULL) AS sample FROM extracted e WHERE NOT EXISTS ( SELECT 1 FROM dataflow.mappings m WHERE m.source_name = p_source_name AND m.rule_name = e.rule_name AND m.input_value = e.extracted_value ) GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value ORDER BY count(*) DESC; END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined'; ------------------------------------------------------ -- Function: reprocess_records -- Clear and reapply transformations ------------------------------------------------------ CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) RETURNS JSON AS $$ BEGIN -- Clear existing transformations UPDATE dataflow.records SET transformed = NULL, transformed_at = NULL WHERE source_name = p_source_name; -- Reapply transformations RETURN dataflow.apply_transformations(p_source_name); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source'; ------------------------------------------------------ -- Function: generate_source_view -- Build a typed flat view in dfv schema ------------------------------------------------------ CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT) RETURNS JSON AS $$ DECLARE v_config JSONB; v_fields JSONB; v_field JSONB; v_cols TEXT := ''; v_sql TEXT; v_view TEXT; BEGIN SELECT config INTO v_config FROM dataflow.sources WHERE name = p_source_name; IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source'); END IF; v_fields := v_config->'fields'; FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields) LOOP IF v_cols != '' THEN v_cols := v_cols || ', '; END IF; IF v_field->>'expression' IS NOT NULL THEN -- Computed expression: substitute {fieldname} refs with (transformed->>'fieldname')::type -- e.g. "{Amount} * {sign}" → "(transformed->>'Amount')::numeric * (transformed->>'sign')::numeric" DECLARE v_expr TEXT := v_field->>'expression'; v_ref TEXT; v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric'); BEGIN WHILE v_expr ~ '\{[^}]+\}' LOOP v_ref := substring(v_expr FROM '\{([^}]+)\}'); v_expr := replace(v_expr, '{' || v_ref || '}', format('(transformed->>%L)::numeric', v_ref)); END LOOP; v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name'); END; ELSE CASE v_field->>'type' WHEN 'date' THEN v_cols := v_cols || format('(transformed->>%L)::date AS %I', v_field->>'name', v_field->>'name'); WHEN 'numeric' THEN v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', v_field->>'name', v_field->>'name'); ELSE v_cols := v_cols || format('transformed->>%L AS %I', v_field->>'name', v_field->>'name'); END CASE; END IF; END LOOP; CREATE SCHEMA IF NOT EXISTS dfv; v_view := 'dfv.' || quote_ident(p_source_name); EXECUTE format('DROP VIEW IF EXISTS %s', v_view); v_sql := format( 'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL', v_view, v_cols, p_source_name ); EXECUTE v_sql; RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields'; ------------------------------------------------------ -- Summary ------------------------------------------------------ -- Functions: 4 simple, focused functions -- 1. import_records - Import with deduplication -- 2. apply_transformations - Apply rules and mappings -- 3. get_unmapped_values - Find values needing mappings -- 4. reprocess_records - Re-transform all records -- -- Each function does ONE thing clearly -- No complex nested CTEs -- Easy to understand and debug ------------------------------------------------------