diff --git a/database/functions.sql b/database/functions.sql index 3f3379d..4d44a31 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -66,122 +66,139 @@ $$ LANGUAGE plpgsql; COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication'; +------------------------------------------------------ +-- Aggregate: jsonb_concat_obj +-- Merge JSONB objects across rows (later rows win on key conflicts) +-- Usage: jsonb_concat_obj(col ORDER BY sequence) +------------------------------------------------------ +CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB) +RETURNS JSONB AS $$ + SELECT COALESCE(a, '{}') || COALESCE(b, '{}') +$$ LANGUAGE sql IMMUTABLE; + +DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB); +CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) ( + sfunc = dataflow.jsonb_merge, + stype = JSONB, + initcond = '{}' +); + ------------------------------------------------------ -- Function: apply_transformations --- Apply all transformation rules to records +-- Apply all transformation rules to records (set-based) ------------------------------------------------------ CREATE OR REPLACE FUNCTION apply_transformations( p_source_name TEXT, p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed ) RETURNS JSON AS $$ -DECLARE - v_record RECORD; - v_rule RECORD; - v_transformed JSONB; - v_match_count BIGINT; - v_extracted JSONB; - v_mapping JSONB; - v_count INTEGER := 0; -BEGIN - -- Loop through records to transform - FOR v_record IN - SELECT id, data - FROM dataflow.records - WHERE source_name = p_source_name - AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) - AND transformed IS NULL - LOOP - -- Start with original data - v_transformed := v_record.data; - - -- Apply each rule in sequence - FOR v_rule IN - SELECT * FROM dataflow.rules - WHERE source_name = p_source_name - AND enabled = true - ORDER BY sequence - LOOP - -- Apply rule based on function type - IF v_rule.function_type = 'replace' THEN - -- Pass flags as third arg so 'g' (replace all) works correctly - v_transformed := jsonb_set( - v_transformed, - ARRAY[v_rule.output_field], - to_jsonb(regexp_replace( - v_record.data->>v_rule.field, - v_rule.pattern, - v_rule.replace_value, - v_rule.flags - )) - ); +WITH +-- All records to process +qualifying AS ( + SELECT id, data + FROM dataflow.records + WHERE source_name = p_source_name + AND transformed IS NULL + AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) +), +-- Apply each enabled rule to each qualifying record that has the required field +rx AS ( + SELECT + q.id, + r.name AS rule_name, + r.sequence, + r.output_field, + r.retain, + CASE r.function_type + WHEN 'replace' THEN + to_jsonb(regexp_replace( + q.data ->> r.field, r.pattern, r.replace_value, r.flags + )) ELSE - -- extract: use regexp_matches so 'g' flag returns all occurrences - -- Aggregate directly to JSONB: single capture → scalar, multi → array - SELECT - jsonb_agg( - CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) - ELSE to_jsonb(mt) - END - ORDER BY rn - ), - count(*) - INTO v_extracted, v_match_count - FROM regexp_matches( - v_record.data->>v_rule.field, - v_rule.pattern, - v_rule.flags - ) WITH ORDINALITY AS m(mt, rn); + -- extract: aggregate all matches; single match → scalar, multiple → array + -- Aggregate first so we can inspect count and first element cleanly + (SELECT + CASE WHEN cnt = 0 THEN NULL + WHEN cnt = 1 THEN agg->0 + ELSE agg + END + FROM ( + SELECT + count(*) AS cnt, + jsonb_agg( + CASE WHEN array_length(mt, 1) = 1 + THEN to_jsonb(mt[1]) + ELSE to_jsonb(mt) + END + ORDER BY rn + ) AS agg + FROM regexp_matches(q.data ->> r.field, r.pattern, r.flags) + WITH ORDINALITY AS m(mt, rn) + ) _agg) + END AS extracted + FROM qualifying q + CROSS JOIN dataflow.rules r + WHERE r.source_name = p_source_name + AND r.enabled = true + AND q.data ? r.field +), +-- Join with mappings to find mapped output for each extracted value +linked AS ( + SELECT + rx.id, + rx.sequence, + rx.output_field, + rx.retain, + rx.extracted, + m.output AS mapped + FROM rx + LEFT JOIN dataflow.mappings m ON + m.source_name = p_source_name + AND m.rule_name = rx.rule_name + AND m.input_value = rx.extracted + WHERE rx.extracted IS NOT NULL +), +-- Build per-rule output JSONB: +-- mapped → use mapping output; also write output_field if retain = true +-- no map → write extracted value to output_field +rule_output AS ( + SELECT + id, + sequence, + CASE + WHEN mapped IS NOT NULL THEN + mapped || + CASE WHEN retain + THEN jsonb_build_object(output_field, extracted) + ELSE '{}'::jsonb + END + ELSE + jsonb_build_object(output_field, extracted) + END AS output + FROM linked +), +-- Merge all rule outputs per record in sequence order (higher sequence wins on conflict) +record_additions AS ( + SELECT + id, + dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions + FROM rule_output + GROUP BY id +), +-- Update all qualifying records; records with no rule matches get transformed = data +updated AS ( + UPDATE dataflow.records rec + SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb), + transformed_at = CURRENT_TIMESTAMP + FROM qualifying q + LEFT JOIN record_additions ra ON ra.id = q.id + WHERE rec.id = q.id + RETURNING rec.id +) +SELECT json_build_object('success', true, 'transformed', count(*)) +FROM updated +$$ LANGUAGE sql; - IF v_match_count > 0 THEN - -- Single match: unwrap the array to get scalar or capture array directly - IF v_match_count = 1 THEN - v_extracted := v_extracted->0; - END IF; - -- v_extracted is now: scalar string, array of captures, or array of matches (g) - - -- Check if there's a mapping for this value - SELECT output INTO v_mapping - FROM dataflow.mappings - WHERE source_name = p_source_name - AND rule_name = v_rule.name - AND input_value = v_extracted; - - IF v_mapping IS NOT NULL THEN - -- Apply mapping (merge mapped fields into result) - v_transformed := v_transformed || v_mapping; - -- If retain is set, also write the extracted value to output_field - IF v_rule.retain THEN - v_transformed := jsonb_set(v_transformed, ARRAY[v_rule.output_field], v_extracted); - END IF; - ELSE - -- No mapping, store extracted value (scalar or array) - v_transformed := jsonb_set( - v_transformed, - ARRAY[v_rule.output_field], - v_extracted - ); - END IF; - END IF; - END IF; - END LOOP; - - -- Update record with transformed data - UPDATE dataflow.records - SET transformed = v_transformed, - transformed_at = CURRENT_TIMESTAMP - WHERE id = v_record.id; - - v_count := v_count + 1; - END LOOP; - - RETURN json_build_object( - 'success', true, - 'transformed', v_count - ); -END; -$$ LANGUAGE plpgsql; - -COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records'; +COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)'; ------------------------------------------------------ -- Function: get_unmapped_values