-- -- Dataflow Functions -- Simple, clear functions for import and transformation -- SET search_path TO dataflow, public; ------------------------------------------------------ -- Function: import_records -- Import data with automatic deduplication ------------------------------------------------------ CREATE OR REPLACE FUNCTION import_records( p_source_name TEXT, p_data JSONB -- Array of records ) RETURNS JSON AS $$ DECLARE v_constraint_fields TEXT[]; v_inserted INTEGER; v_duplicates INTEGER; v_log_id INTEGER; BEGIN SELECT constraint_fields INTO v_constraint_fields FROM dataflow.sources WHERE name = p_source_name; IF v_constraint_fields IS NULL THEN RETURN json_build_object( 'success', false, 'error', 'Source not found: ' || p_source_name ); END IF; WITH -- All incoming records with their constraint keys pending AS ( SELECT rec.value AS data, rec.ordinality AS seq, (SELECT jsonb_object_agg(f, rec.value->>f) FROM unnest(v_constraint_fields) AS f) AS constraint_key FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec ), -- Keys already in the database (excluded) existing AS ( SELECT DISTINCT r.constraint_key FROM dataflow.records r INNER JOIN pending p ON p.constraint_key = r.constraint_key WHERE r.source_name = p_source_name ), -- Rows whose constraint key is not yet in the database new_records AS ( SELECT p.data, p.constraint_key, p.seq FROM pending p WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.constraint_key = p.constraint_key) ), -- Write the log entry log_entry AS ( INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info) VALUES ( p_source_name, (SELECT count(*) FROM new_records), (SELECT count(*) FROM pending) - (SELECT count(*) FROM new_records), jsonb_build_object( 'total', jsonb_array_length(p_data), 'inserted_keys', (SELECT jsonb_agg(constraint_key ORDER BY constraint_key) FROM new_records), 'excluded_keys', (SELECT jsonb_agg(constraint_key) FROM existing) ) ) RETURNING id, records_imported, records_duplicate ), -- Insert new records inserted AS ( INSERT INTO dataflow.records (source_name, data, constraint_key, import_id) SELECT p_source_name, nr.data, nr.constraint_key, (SELECT id FROM log_entry) FROM new_records nr ORDER BY nr.seq RETURNING id ) SELECT le.id, le.records_imported, le.records_duplicate INTO v_log_id, v_inserted, v_duplicates FROM log_entry le; RETURN json_build_object( 'success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id ); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication'; ------------------------------------------------------ -- Function: get_import_log -- Return import history for a source ------------------------------------------------------ CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT) RETURNS TABLE ( id INTEGER, source_name TEXT, records_imported INTEGER, records_duplicate INTEGER, imported_at TIMESTAMPTZ, info JSONB ) AS $$ SELECT id, source_name, records_imported, records_duplicate, imported_at, info FROM dataflow.import_log WHERE source_name = p_source_name ORDER BY imported_at DESC; $$ LANGUAGE sql; COMMENT ON FUNCTION get_import_log IS 'Return import history for a source, newest first, including inserted/excluded key lists'; ------------------------------------------------------ -- Function: get_all_import_logs -- Return import history across all sources ------------------------------------------------------ CREATE OR REPLACE FUNCTION get_all_import_logs() RETURNS TABLE ( id INTEGER, source_name TEXT, records_imported INTEGER, records_duplicate INTEGER, imported_at TIMESTAMPTZ, info JSONB ) AS $$ SELECT id, source_name, records_imported, records_duplicate, imported_at, info FROM dataflow.import_log ORDER BY imported_at DESC; $$ LANGUAGE sql; COMMENT ON FUNCTION get_all_import_logs IS 'Return import history across all sources, newest first'; ------------------------------------------------------ -- Function: delete_import -- Delete all records from a specific import and remove the log entry ------------------------------------------------------ CREATE OR REPLACE FUNCTION delete_import(p_log_id INTEGER) RETURNS JSON AS $$ DECLARE v_deleted INTEGER; BEGIN IF NOT EXISTS (SELECT 1 FROM dataflow.import_log WHERE id = p_log_id) THEN RETURN json_build_object('success', false, 'error', 'Import log entry not found'); END IF; SELECT count(*) INTO v_deleted FROM dataflow.records WHERE import_id = p_log_id; -- Cascade handles deleting records via FK ON DELETE CASCADE DELETE FROM dataflow.import_log WHERE id = p_log_id; RETURN json_build_object( 'success', true, 'records_deleted', v_deleted, 'log_id', p_log_id ); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION delete_import IS 'Delete all records belonging to an import batch and remove the log entry'; ------------------------------------------------------ -- Aggregate: jsonb_concat_obj -- Merge JSONB objects across rows (later rows win on key conflicts) -- Usage: jsonb_concat_obj(col ORDER BY sequence) ------------------------------------------------------ CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB) RETURNS JSONB AS $$ SELECT COALESCE(a, '{}') || COALESCE(b, '{}') $$ LANGUAGE sql IMMUTABLE; DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB); CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) ( sfunc = dataflow.jsonb_merge, stype = JSONB, initcond = '{}' ); ------------------------------------------------------ -- Function: apply_transformations -- Apply all transformation rules to records (set-based) ------------------------------------------------------ DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]); CREATE OR REPLACE FUNCTION apply_transformations( p_source_name TEXT, p_record_ids INTEGER[] DEFAULT NULL, -- NULL = all eligible records p_overwrite BOOLEAN DEFAULT FALSE -- FALSE = skip already-transformed, TRUE = overwrite all ) RETURNS JSON AS $$ WITH -- All records to process qualifying AS ( SELECT id, data FROM dataflow.records WHERE source_name = p_source_name AND (p_overwrite OR transformed IS NULL) AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) ), -- Mirror TPS rx: fan out one row per regex match, drive from rules → records rx AS ( SELECT q.id, r.name AS rule_name, r.sequence, r.output_field, r.retain, r.function_type, COALESCE(mt.rn, rp.rn, 1) AS result_number, -- extract: build map_val and retain_val per match (mirrors TPS) CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val, to_jsonb(rp.rp) AS replace_val FROM dataflow.rules r INNER JOIN qualifying q ON q.data ? r.field LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags) WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract' LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags) WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace' WHERE r.source_name = p_source_name AND r.enabled = true ), -- Aggregate match rows back into one value per (record, rule) — mirrors TPS agg_to_target_items agg_matches AS ( SELECT id, rule_name, sequence, output_field, retain, function_type, CASE function_type WHEN 'replace' THEN jsonb_agg(replace_val) -> 0 ELSE CASE WHEN max(result_number) = 1 THEN jsonb_agg(match_val ORDER BY result_number) -> 0 ELSE jsonb_agg(match_val ORDER BY result_number) END END AS extracted FROM rx GROUP BY id, rule_name, sequence, output_field, retain, function_type ), -- Join with mappings to find mapped output — mirrors TPS link_map linked AS ( SELECT a.id, a.sequence, a.output_field, a.retain, a.extracted, m.output AS mapped FROM agg_matches a LEFT JOIN dataflow.mappings m ON m.source_name = p_source_name AND m.rule_name = a.rule_name AND m.input_value = a.extracted WHERE a.extracted IS NOT NULL ), -- Build per-rule output JSONB: -- mapped → use mapping output; also write output_field if retain = true -- no map → write extracted value to output_field rule_output AS ( SELECT id, sequence, CASE WHEN mapped IS NOT NULL THEN mapped || CASE WHEN retain THEN jsonb_build_object(output_field, extracted) ELSE '{}'::jsonb END ELSE jsonb_build_object(output_field, extracted) END AS output FROM linked ), -- Merge all rule outputs per record in sequence order — mirrors TPS agg_to_id record_additions AS ( SELECT id, dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions FROM rule_output GROUP BY id ), -- Update all qualifying records; records with no rule matches get transformed = data updated AS ( UPDATE dataflow.records rec SET transformed = COALESCE(ra.additions, '{}'::jsonb), transformed_at = CURRENT_TIMESTAMP FROM qualifying q LEFT JOIN record_additions ra ON ra.id = q.id WHERE rec.id = q.id RETURNING rec.id ) SELECT json_build_object('success', true, 'transformed', count(*)) FROM updated $$ LANGUAGE sql; COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)'; ------------------------------------------------------ -- Function: get_all_values -- All extracted values (mapped + unmapped) with counts and mapping output ------------------------------------------------------ DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT); CREATE FUNCTION get_all_values( p_source_name TEXT, p_rule_name TEXT DEFAULT NULL ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, source_field TEXT, extracted_value JSONB, record_count BIGINT, sample JSONB, mapping_id INTEGER, output JSONB, is_mapped BOOLEAN ) AS $$ BEGIN RETURN QUERY WITH extracted AS ( SELECT r.name AS rule_name, r.output_field, r.field AS source_field, rec.transformed->r.output_field AS extracted_value, rec.data AS record_data, row_number() OVER ( PARTITION BY r.name, rec.transformed->r.output_field ORDER BY rec.id ) AS rn FROM dataflow.records rec CROSS JOIN dataflow.rules r WHERE rec.source_name = p_source_name AND r.source_name = p_source_name AND rec.transformed IS NOT NULL AND rec.transformed ? r.output_field AND (p_rule_name IS NULL OR r.name = p_rule_name) AND rec.data ? r.field ), aggregated AS ( SELECT e.rule_name, e.output_field, e.source_field, e.extracted_value, count(*) AS record_count, jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample FROM extracted e GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value ) SELECT a.rule_name, a.output_field, a.source_field, a.extracted_value, a.record_count, a.sample, m.id AS mapping_id, m.output, (m.id IS NOT NULL) AS is_mapped FROM aggregated a LEFT JOIN dataflow.mappings m ON m.source_name = p_source_name AND m.rule_name = a.rule_name AND m.input_value = a.extracted_value ORDER BY a.record_count DESC; END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION get_all_values IS 'All extracted values with record counts and mapping output (single query for All tab)'; ------------------------------------------------------ -- Function: get_unmapped_values -- Find extracted values that need mappings ------------------------------------------------------ DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT); CREATE FUNCTION get_unmapped_values( p_source_name TEXT, p_rule_name TEXT DEFAULT NULL ) RETURNS TABLE ( rule_name TEXT, output_field TEXT, source_field TEXT, extracted_value JSONB, record_count BIGINT, sample JSONB ) AS $$ BEGIN RETURN QUERY WITH extracted AS ( SELECT r.name AS rule_name, r.output_field, r.field AS source_field, rec.transformed->r.output_field AS extracted_value, rec.data AS record_data, row_number() OVER ( PARTITION BY r.name, rec.transformed->r.output_field ORDER BY rec.id ) AS rn FROM dataflow.records rec CROSS JOIN dataflow.rules r WHERE rec.source_name = p_source_name AND r.source_name = p_source_name AND rec.transformed IS NOT NULL AND rec.transformed ? r.output_field AND (p_rule_name IS NULL OR r.name = p_rule_name) AND rec.data ? r.field ) SELECT e.rule_name, e.output_field, e.source_field, e.extracted_value, count(*) AS record_count, jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample FROM extracted e WHERE NOT EXISTS ( SELECT 1 FROM dataflow.mappings m WHERE m.source_name = p_source_name AND m.rule_name = e.rule_name AND m.input_value = e.extracted_value ) GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value ORDER BY count(*) DESC; END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined'; ------------------------------------------------------ -- Function: reprocess_records -- Clear and reapply transformations ------------------------------------------------------ CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) RETURNS JSON AS $$ -- Overwrite all records directly — no clear step, mirrors TPS srce_map_overwrite SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE) $$ LANGUAGE sql; COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source'; ------------------------------------------------------ -- Function: generate_source_view -- Build a typed flat view in dfv schema ------------------------------------------------------ CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT) RETURNS JSON AS $$ DECLARE v_config JSONB; v_fields JSONB; v_field JSONB; v_cols TEXT := ''; v_sql TEXT; v_view TEXT; BEGIN SELECT config INTO v_config FROM dataflow.sources WHERE name = p_source_name; IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source'); END IF; v_fields := v_config->'fields'; FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields) LOOP IF v_cols != '' THEN v_cols := v_cols || ', '; END IF; IF v_field->>'expression' IS NOT NULL THEN DECLARE v_expr TEXT := v_field->>'expression'; v_ref TEXT; v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric'); BEGIN WHILE v_expr ~ '\{[^}]+\}' LOOP v_ref := substring(v_expr FROM '\{([^}]+)\}'); v_expr := replace(v_expr, '{' || v_ref || '}', format('(r->>%L)::numeric', v_ref)); END LOOP; v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name'); END; ELSE CASE v_field->>'type' WHEN 'date' THEN v_cols := v_cols || format('(r->>%L)::date AS %I', v_field->>'name', v_field->>'name'); WHEN 'numeric' THEN v_cols := v_cols || format('(r->>%L)::numeric AS %I', v_field->>'name', v_field->>'name'); ELSE v_cols := v_cols || format('r->>%L AS %I', v_field->>'name', v_field->>'name'); END CASE; END IF; END LOOP; CREATE SCHEMA IF NOT EXISTS dfv; v_view := 'dfv.' || quote_ident(p_source_name); EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view); v_sql := format( 'CREATE VIEW %s AS SELECT id, %s FROM (SELECT id, data || COALESCE(transformed, ''{}''::jsonb) || COALESCE(overrides, ''{}''::jsonb) AS r FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL) rec', v_view, v_cols, p_source_name ); EXECUTE v_sql; RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql); END; $$ LANGUAGE plpgsql; COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields'; ------------------------------------------------------ -- Function: set_record_overrides -- Save override values for a single record ------------------------------------------------------ DROP FUNCTION IF EXISTS set_record_overrides(INTEGER, JSONB); CREATE OR REPLACE FUNCTION set_record_overrides(p_id INTEGER, p_overrides JSONB) RETURNS JSON AS $$ WITH updated AS ( UPDATE dataflow.records SET overrides = CASE WHEN p_overrides = '{}'::jsonb THEN NULL ELSE p_overrides END WHERE id = p_id RETURNING * ) SELECT row_to_json(updated) FROM updated; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: clear_record_overrides -- Remove all overrides for a single record ------------------------------------------------------ DROP FUNCTION IF EXISTS clear_record_overrides(INTEGER); CREATE OR REPLACE FUNCTION clear_record_overrides(p_id INTEGER) RETURNS JSON AS $$ WITH updated AS ( UPDATE dataflow.records SET overrides = NULL WHERE id = p_id RETURNING * ) SELECT row_to_json(updated) FROM updated; $$ LANGUAGE sql; ------------------------------------------------------ -- Function: bulk_set_record_overrides -- Apply override values to multiple records ------------------------------------------------------ DROP FUNCTION IF EXISTS bulk_set_record_overrides(TEXT, INTEGER[], JSONB); CREATE OR REPLACE FUNCTION bulk_set_record_overrides(p_source_name TEXT, p_ids INTEGER[], p_overrides JSONB) RETURNS JSON AS $$ WITH updated AS ( UPDATE dataflow.records SET overrides = COALESCE(overrides, '{}'::jsonb) || p_overrides WHERE id = ANY(p_ids) AND source_name = p_source_name RETURNING id ) SELECT json_build_object('updated', count(*)) FROM updated; $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION dataflow.calibrate_balance(p_stack_name text, p_source_name text, p_as_of_date date, p_known_balance numeric) RETURNS json LANGUAGE plpgsql STABLE AS $function$ DECLARE v_src dataflow.stack_sources%ROWTYPE; v_running NUMERIC; v_sql TEXT; BEGIN SELECT * INTO v_src FROM dataflow.stack_sources WHERE stack_name = p_stack_name AND source_name = p_source_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Source not in stack'); END IF; IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first'); END IF; BEGIN IF p_as_of_date IS NULL THEN v_sql := format( 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I', v_src.amount_field, v_src.amount_sign, p_source_name ); ELSE v_sql := format( 'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date', v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date ); END IF; EXECUTE v_sql INTO v_running; EXCEPTION WHEN undefined_table THEN RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first'); END; RETURN json_build_object( 'success', true, 'source', p_source_name, 'as_of_date', p_as_of_date, 'known_balance', p_known_balance, 'computed_sum', v_running, 'suggested_offset', p_known_balance - v_running ); END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.create_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb) RETURNS dataflow.mappings LANGUAGE sql AS $function$ INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) VALUES (p_source_name, p_rule_name, p_input_value, p_output) RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.create_rule(p_source_name text, p_name text, p_field text, p_pattern text, p_output_field text, p_function_type text DEFAULT 'extract'::text, p_flags text DEFAULT ''::text, p_replace_value text DEFAULT ''::text, p_enabled boolean DEFAULT true, p_retain boolean DEFAULT false, p_sequence integer DEFAULT 0) RETURNS dataflow.rules LANGUAGE sql AS $function$ INSERT INTO dataflow.rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence) VALUES (p_source_name, p_name, p_field, p_pattern, p_output_field, p_function_type, p_flags, p_replace_value, p_enabled, p_retain, p_sequence) RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb, p_global_picklist boolean DEFAULT true) RETURNS dataflow.sources LANGUAGE sql AS $function$ INSERT INTO dataflow.sources (name, constraint_fields, config, global_picklist) VALUES (p_name, p_constraint_fields, p_config, p_global_picklist) RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb) RETURNS dataflow.sources LANGUAGE sql AS $function$ INSERT INTO dataflow.sources (name, constraint_fields, config) VALUES (p_name, p_constraint_fields, p_config) RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.create_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT '[]'::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT 0) RETURNS dataflow.stacks LANGUAGE sql AS $function$ INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset) VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset) RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_mapping(p_id integer) RETURNS TABLE(id integer) LANGUAGE sql AS $function$ DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_pivot_layout(p_id integer) RETURNS TABLE(id integer) LANGUAGE sql AS $function$ DELETE FROM dataflow.pivot_layouts WHERE id = p_id RETURNING id; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_record(p_id bigint) RETURNS TABLE(id bigint) LANGUAGE sql AS $function$ DELETE FROM dataflow.records WHERE id = p_id RETURNING id; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_rule(p_id integer) RETURNS TABLE(id integer, name text) LANGUAGE sql AS $function$ DELETE FROM dataflow.rules WHERE id = p_id RETURNING id, name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_source(p_name text) RETURNS text LANGUAGE sql AS $function$ DELETE FROM dataflow.sources WHERE name = p_name RETURNING name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_source_records(p_source_name text) RETURNS TABLE(deleted_count bigint) LANGUAGE sql AS $function$ WITH deleted AS ( DELETE FROM dataflow.records WHERE source_name = p_source_name RETURNING id ) SELECT count(*) AS deleted_count FROM deleted; $function$ ; CREATE OR REPLACE FUNCTION dataflow.delete_stack(p_name text) RETURNS TABLE(name text) LANGUAGE sql AS $function$ DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.generate_stack_view(p_stack_name text, p_dry_run boolean DEFAULT false) RETURNS json LANGUAGE plpgsql AS $function$ DECLARE v_stack dataflow.stacks%ROWTYPE; v_src dataflow.stack_sources%ROWTYPE; v_field JSONB; v_ctes TEXT[] := '{}'; v_cte_names TEXT[] := '{}'; v_select TEXT; v_col TEXT; v_src_field TEXT; v_amt_src TEXT; v_date_src TEXT; v_view TEXT; v_sql TEXT; v_has_bal BOOLEAN; v_canon_cols TEXT; v_src_bal_cols TEXT; v_total_offset NUMERIC := 0; v_cascade_stale TEXT[]; BEGIN SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Stack not found'); END IF; v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL; -- Build one CTE per source querying dfv.{source} directly FOR v_src IN SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY seq, id LOOP v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name); FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields) LOOP v_col := v_field->>'name'; IF v_has_bal AND v_col = v_stack.amount_field THEN -- Use per-source amount_field with sign applied IF v_src.amount_field IS NULL THEN v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col); ELSE v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col); END IF; ELSIF v_has_bal AND v_col = v_stack.date_field THEN -- Use per-source date_field IF v_src.date_field IS NULL THEN v_select := v_select || format(', NULL::date AS %I', v_col); ELSE v_select := v_select || format(', %I AS %I', v_src.date_field, v_col); END IF; ELSE -- Other canonical fields: use field_map or same name, NULL if column doesn't exist v_src_field := COALESCE(v_src.field_map->>v_col, v_col); IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_schema = 'dfv' AND table_name = v_src.source_name AND column_name = v_src_field ) THEN v_select := v_select || format(', %I AS %I', v_src_field, v_col); ELSE v_select := v_select || format(', NULL::text AS %I', v_col); END IF; END IF; END LOOP; v_select := v_select || format(' FROM dfv.%I', v_src.source_name); v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select); v_cte_names := v_cte_names || quote_ident(v_src.source_name); -- Accumulate carried-forward source balance column and total offset IF v_has_bal THEN IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF; v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format( 'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I', v_src.source_name, v_stack.amount_field, v_stack.date_field, v_src.balance_offset, v_src.source_name || '_balance' ); v_total_offset := v_total_offset + v_src.balance_offset; END IF; END LOOP; IF array_length(v_ctes, 1) IS NULL THEN RETURN json_build_object('success', false, 'error', 'Stack has no sources'); END IF; v_view := 'dfv.' || quote_ident(p_stack_name); v_canon_cols := ( SELECT string_agg(quote_ident(f->>'name'), ', ') FROM jsonb_array_elements(v_stack.fields) f ); IF v_has_bal THEN v_sql := format( 'CREATE VIEW %s AS ' 'WITH %s, _stacked AS (SELECT * FROM %s) ' 'SELECT _source, _id, %s, ' '%s, ' 'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance ' 'FROM _stacked ORDER BY %I DESC, _id DESC', v_view, array_to_string(v_ctes, ', '), array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '), v_canon_cols, v_src_bal_cols, v_stack.amount_field, v_stack.date_field, v_total_offset, v_stack.date_field ); ELSE v_sql := format( 'CREATE VIEW %s AS ' 'WITH %s, _stacked AS (SELECT * FROM %s) ' 'SELECT _source, _id, %s FROM _stacked', v_view, array_to_string(v_ctes, ', '), array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '), v_canon_cols ); END IF; IF NOT p_dry_run THEN CREATE SCHEMA IF NOT EXISTS dfv; EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view); EXECUTE v_sql; -- Detect stacks whose views were dropped by CASCADE and mark them stale SELECT array_agg(s.name) INTO v_cascade_stale FROM dataflow.stacks s WHERE s.name != p_stack_name AND s.view_generated_at IS NOT NULL AND NOT EXISTS ( SELECT 1 FROM pg_views v WHERE v.schemaname = 'dfv' AND v.viewname = s.name ); UPDATE dataflow.stacks SET view_generated_at = NULL WHERE name = ANY(v_cascade_stale); END IF; RETURN json_build_object( 'success', true, 'view', v_view, 'sql', v_sql, 'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json) ); END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_global_output_values() RETURNS TABLE(col text, val text) LANGUAGE sql STABLE AS $function$ SELECT DISTINCT e.key AS col, e.value AS val FROM dataflow.mappings m JOIN dataflow.sources s ON s.name = m.source_name CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value) WHERE s.global_picklist = true AND e.value IS NOT NULL AND e.value <> '' ORDER BY e.key, e.value; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_mapping(p_id integer) RETURNS dataflow.mappings LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.mappings WHERE id = p_id; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_mapping_counts(p_source_name text, p_rule_name text DEFAULT NULL::text) RETURNS TABLE(rule_name text, input_value jsonb, record_count bigint) LANGUAGE sql STABLE AS $function$ SELECT m.rule_name, m.input_value, COUNT(rec.id) AS record_count FROM dataflow.mappings m JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name LEFT JOIN dataflow.records rec ON rec.source_name = m.source_name AND rec.transformed ? r.output_field AND rec.transformed -> r.output_field = m.input_value WHERE m.source_name = p_source_name AND (p_rule_name IS NULL OR m.rule_name = p_rule_name) GROUP BY m.rule_name, m.input_value; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_mappings_by_output_field(p_col text, p_val text) RETURNS TABLE(id integer, source_name text, rule_name text, input_value jsonb, output jsonb) LANGUAGE sql STABLE AS $function$ SELECT m.id, m.source_name, m.rule_name, m.input_value, m.output FROM dataflow.mappings m WHERE m.output->>(p_col) = p_val ORDER BY m.source_name, m.rule_name, m.input_value::text; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_record(p_id bigint) RETURNS dataflow.records LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.records WHERE id = p_id; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_rule(p_id integer) RETURNS dataflow.rules LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.rules WHERE id = p_id; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_source(p_name text) RETURNS dataflow.sources LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.sources WHERE name = p_name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_source_fields(p_source_name text) RETURNS TABLE(key text, origins text[]) LANGUAGE sql STABLE AS $function$ SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins FROM ( SELECT f->>'name' AS key, 'schema' AS origin FROM dataflow.sources, jsonb_array_elements(config->'fields') f WHERE name = p_source_name AND config ? 'fields' UNION ALL SELECT jsonb_object_keys(data) AS key, 'raw' AS origin FROM dataflow.records WHERE source_name = p_source_name UNION ALL SELECT output_field AS key, 'rule: ' || name AS origin FROM dataflow.rules WHERE source_name = p_source_name UNION ALL SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin FROM dataflow.mappings WHERE source_name = p_source_name ) keys GROUP BY key ORDER BY key; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_source_stats(p_source_name text) RETURNS TABLE(total_records bigint, transformed_records bigint, pending_records bigint) LANGUAGE sql STABLE AS $function$ SELECT COUNT(*) AS total_records, COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records, COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records FROM dataflow.records WHERE source_name = p_source_name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_stack(p_name text) RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, created_at timestamp with time zone, sources jsonb) LANGUAGE sql STABLE AS $function$ SELECT s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at, COALESCE(jsonb_agg( jsonb_build_object( 'id', ss.id, 'source_name', ss.source_name, 'field_map', ss.field_map, 'amount_field', ss.amount_field, 'amount_sign', ss.amount_sign, 'date_field', ss.date_field, 'balance_offset', ss.balance_offset, 'seq', ss.seq ) ORDER BY ss.seq, ss.id ) FILTER (WHERE ss.id IS NOT NULL), '[]') FROM dataflow.stacks s LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name WHERE s.name = p_name GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_stack_balance(p_stack_name text) RETURNS json LANGUAGE plpgsql STABLE AS $function$ DECLARE v_stack dataflow.stacks%ROWTYPE; v_balance NUMERIC; v_view TEXT; v_sql TEXT; BEGIN SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name; IF NOT FOUND THEN RETURN json_build_object('success', false, 'error', 'Stack not found'); END IF; IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set'); END IF; v_view := 'dfv.' || quote_ident(p_stack_name); BEGIN v_sql := format( 'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1', v_view, v_stack.date_field ); EXECUTE v_sql INTO v_balance; EXCEPTION WHEN undefined_table THEN RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first'); END; RETURN json_build_object('success', true, 'balance', v_balance); END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_status() RETURNS json LANGUAGE plpgsql STABLE AS $function$ DECLARE v_sources JSON; v_stacks JSON; BEGIN SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json) INTO v_sources FROM dataflow.sources WHERE view_generated_at IS NULL; SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json) INTO v_stacks FROM dataflow.stacks WHERE view_generated_at IS NULL; RETURN json_build_object('stale_sources', v_sources, 'stale_stacks', v_stacks); END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.get_view_data(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_sort_col text DEFAULT NULL::text, p_sort_dir text DEFAULT 'asc'::text, p_filters jsonb DEFAULT NULL::jsonb) RETURNS json LANGUAGE plpgsql STABLE AS $function$ DECLARE v_exists BOOLEAN; v_where TEXT := ''; v_order TEXT := ''; v_rows JSON; v_filter JSONB; v_col TEXT; v_pattern TEXT; BEGIN SELECT EXISTS ( SELECT 1 FROM information_schema.views WHERE table_schema = 'dfv' AND table_name = p_source_name ) INTO v_exists; IF NOT v_exists THEN RETURN json_build_object('exists', FALSE, 'rows', '[]'::json); END IF; -- Build WHERE from filters (validate each column exists in the view) IF p_filters IS NOT NULL THEN FOR v_filter IN SELECT value FROM jsonb_array_elements(p_filters) LOOP v_col := v_filter->>'col'; v_pattern := v_filter->>'pattern'; IF v_pattern IS NOT NULL AND v_pattern <> '' AND EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_schema = 'dfv' AND table_name = p_source_name AND column_name = v_col ) THEN v_where := v_where || CASE WHEN v_where = '' THEN ' WHERE ' ELSE ' AND ' END || quote_ident(v_col) || '::text ~* ' || quote_literal(v_pattern); END IF; END LOOP; END IF; IF p_sort_col IS NOT NULL AND EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_schema = 'dfv' AND table_name = p_source_name AND column_name = p_sort_col ) THEN v_order := ' ORDER BY ' || quote_ident(p_sort_col) || CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END || ' NULLS LAST'; END IF; EXECUTE format( 'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s%s LIMIT %s OFFSET %s) t', p_source_name, v_where, v_order, p_limit, p_offset ) INTO v_rows; RETURN json_build_object('exists', TRUE, 'rows', v_rows); END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.list_mappings(p_source_name text, p_rule_name text DEFAULT NULL::text) RETURNS SETOF dataflow.mappings LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.mappings WHERE source_name = p_source_name AND (p_rule_name IS NULL OR rule_name = p_rule_name) ORDER BY rule_name, input_value::text; $function$ ; CREATE OR REPLACE FUNCTION dataflow.list_pivot_layouts(p_source_name text) RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone) LANGUAGE sql AS $function$ SELECT id, source_name, layout_name, config, created_at FROM dataflow.pivot_layouts WHERE source_name = p_source_name ORDER BY layout_name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.list_records(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_transformed_only boolean DEFAULT false) RETURNS SETOF dataflow.records LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.records WHERE source_name = p_source_name AND (NOT p_transformed_only OR transformed IS NOT NULL) ORDER BY id DESC LIMIT p_limit OFFSET p_offset; $function$ ; CREATE OR REPLACE FUNCTION dataflow.list_rules(p_source_name text) RETURNS SETOF dataflow.rules LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.rules WHERE source_name = p_source_name ORDER BY sequence, name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.list_sources() RETURNS SETOF dataflow.sources LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.sources ORDER BY name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.list_stacks() RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, source_count bigint, created_at timestamp with time zone) LANGUAGE sql STABLE AS $function$ SELECT s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, count(ss.id) AS source_count, s.created_at FROM dataflow.stacks s LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at ORDER BY s.name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.preview_rule(p_source text, p_field text, p_pattern text, p_flags text DEFAULT ''::text, p_function_type text DEFAULT 'extract'::text, p_replace_value text DEFAULT ''::text, p_limit integer DEFAULT 20) RETURNS TABLE(id integer, raw_value text, extracted_value jsonb) LANGUAGE plpgsql STABLE AS $function$ -- Field is resolved from data first, then transformed (supports chained rules whose -- input field was produced by an earlier-sequence rule rather than the raw import). BEGIN IF p_function_type = 'replace' THEN RETURN QUERY SELECT r.id, COALESCE(r.data ->> p_field, r.transformed ->> p_field), to_jsonb(regexp_replace( COALESCE(r.data ->> p_field, r.transformed ->> p_field), p_pattern, p_replace_value, p_flags )) FROM dataflow.records r WHERE source_name = p_source AND (data ? p_field OR transformed ? p_field) ORDER BY r.id DESC LIMIT p_limit; ELSE RETURN QUERY SELECT r.id, COALESCE(r.data ->> p_field, r.transformed ->> p_field), CASE WHEN agg.match_count = 0 THEN NULL WHEN agg.match_count = 1 THEN agg.matches -> 0 ELSE agg.matches END FROM dataflow.records r CROSS JOIN LATERAL ( SELECT jsonb_agg( CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) ELSE to_jsonb(mt) END ORDER BY rn ) AS matches, count(*)::int AS match_count FROM regexp_matches( COALESCE(r.data ->> p_field, r.transformed ->> p_field), p_pattern, p_flags ) WITH ORDINALITY AS m(mt, rn) ) agg WHERE r.source_name = p_source AND (r.data ? p_field OR r.transformed ? p_field) ORDER BY r.id DESC LIMIT p_limit; END IF; END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.remap_output_field(p_col text, p_from_val text, p_to_val text) RETURNS integer LANGUAGE plpgsql AS $function$ DECLARE updated_count INTEGER; BEGIN UPDATE dataflow.mappings SET output = jsonb_set(output, ARRAY[p_col], to_jsonb(p_to_val)) WHERE output->>(p_col) = p_from_val; GET DIAGNOSTICS updated_count = ROW_COUNT; RETURN updated_count; END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.remove_stack_source(p_stack_name text, p_source_name text) RETURNS TABLE(source_name text) LANGUAGE sql AS $function$ DELETE FROM dataflow.stack_sources WHERE stack_name = p_stack_name AND source_name = p_source_name RETURNING source_name; $function$ ; CREATE OR REPLACE FUNCTION dataflow.reorder_stack_sources(p_stack_name text, p_source_names text[]) RETURNS void LANGUAGE plpgsql AS $function$ DECLARE i INTEGER; BEGIN FOR i IN 1..array_length(p_source_names, 1) LOOP UPDATE dataflow.stack_sources SET seq = i WHERE stack_name = p_stack_name AND source_name = p_source_names[i]; END LOOP; END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.save_pivot_layout(p_source_name text, p_layout_name text, p_config jsonb) RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone) LANGUAGE sql AS $function$ INSERT INTO dataflow.pivot_layouts (source_name, layout_name, config) VALUES (p_source_name, p_layout_name, p_config) ON CONFLICT (source_name, layout_name) DO UPDATE SET config = EXCLUDED.config RETURNING id, source_name, layout_name, config, created_at; $function$ ; CREATE OR REPLACE FUNCTION dataflow.search_mapping_outputs(p_search text) RETURNS TABLE(col text, val text, mapping_count bigint) LANGUAGE sql STABLE AS $function$ SELECT e.key AS col, e.value AS val, COUNT(*) AS mapping_count FROM dataflow.mappings m CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value) WHERE e.value ILIKE '%' || p_search || '%' AND e.value IS NOT NULL AND e.value <> '' GROUP BY e.key, e.value ORDER BY e.key, e.value; $function$ ; CREATE OR REPLACE FUNCTION dataflow.search_records(p_source_name text, p_query jsonb, p_limit integer DEFAULT 100) RETURNS SETOF dataflow.records LANGUAGE sql STABLE AS $function$ SELECT * FROM dataflow.records WHERE source_name = p_source_name AND (data @> p_query OR transformed @> p_query) ORDER BY id DESC LIMIT p_limit; $function$ ; CREATE OR REPLACE FUNCTION dataflow.source_config_changed() RETURNS trigger LANGUAGE plpgsql AS $function$ BEGIN IF NEW.config IS DISTINCT FROM OLD.config THEN NEW.view_generated_at := NULL; END IF; RETURN NEW; END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.stack_sources_changed() RETURNS trigger LANGUAGE plpgsql AS $function$ BEGIN IF TG_OP = 'UPDATE' THEN IF NEW.field_map IS NOT DISTINCT FROM OLD.field_map AND NEW.amount_sign IS NOT DISTINCT FROM OLD.amount_sign AND NEW.balance_offset IS NOT DISTINCT FROM OLD.balance_offset AND NEW.amount_field IS NOT DISTINCT FROM OLD.amount_field AND NEW.date_field IS NOT DISTINCT FROM OLD.date_field AND NEW.seq IS NOT DISTINCT FROM OLD.seq THEN RETURN NULL; END IF; END IF; UPDATE dataflow.stacks SET view_generated_at = NULL WHERE name = COALESCE(NEW.stack_name, OLD.stack_name); RETURN NULL; END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.test_rule(p_rule_id integer, p_limit integer DEFAULT 20) RETURNS TABLE(rule jsonb, results jsonb) LANGUAGE plpgsql STABLE AS $function$ DECLARE v_rule dataflow.rules%ROWTYPE; v_results JSONB; BEGIN SELECT * INTO v_rule FROM dataflow.rules WHERE id = p_rule_id; IF NOT FOUND THEN RETURN; END IF; SELECT jsonb_agg(row_to_json(t)) INTO v_results FROM ( SELECT r.id, r.data ->> v_rule.field AS raw_value, CASE WHEN agg.match_count = 0 THEN NULL WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1 THEN to_jsonb(agg.matches[1][1]) WHEN agg.match_count = 1 THEN to_jsonb(agg.matches[1]) WHEN array_length(agg.matches[1], 1) = 1 THEN (SELECT jsonb_agg(m[1] ORDER BY idx) FROM unnest(agg.matches) WITH ORDINALITY u(m, idx)) ELSE to_jsonb(agg.matches) END AS extracted_value FROM dataflow.records r CROSS JOIN LATERAL ( SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count FROM regexp_matches(r.data ->> v_rule.field, v_rule.pattern, COALESCE(v_rule.flags, '')) WITH ORDINALITY AS m(mt, rn) ) agg WHERE r.source_name = v_rule.source_name AND r.data ? v_rule.field ORDER BY r.id DESC LIMIT p_limit ) t; RETURN QUERY SELECT jsonb_build_object( 'id', v_rule.id, 'name', v_rule.name, 'field', v_rule.field, 'pattern', v_rule.pattern, 'output_field', v_rule.output_field ), COALESCE(v_results, '[]'::jsonb); END; $function$ ; CREATE OR REPLACE FUNCTION dataflow.update_mapping(p_id integer, p_input_value jsonb DEFAULT NULL::jsonb, p_output jsonb DEFAULT NULL::jsonb) RETURNS dataflow.mappings LANGUAGE sql AS $function$ UPDATE dataflow.mappings SET input_value = COALESCE(p_input_value, input_value), output = COALESCE(p_output, output) WHERE id = p_id RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.update_rule(p_id integer, p_name text DEFAULT NULL::text, p_field text DEFAULT NULL::text, p_pattern text DEFAULT NULL::text, p_output_field text DEFAULT NULL::text, p_function_type text DEFAULT NULL::text, p_flags text DEFAULT NULL::text, p_replace_value text DEFAULT NULL::text, p_enabled boolean DEFAULT NULL::boolean, p_retain boolean DEFAULT NULL::boolean, p_sequence integer DEFAULT NULL::integer) RETURNS dataflow.rules LANGUAGE sql AS $function$ UPDATE dataflow.rules SET name = COALESCE(p_name, name), field = COALESCE(p_field, field), pattern = COALESCE(p_pattern, pattern), output_field = COALESCE(p_output_field, output_field), function_type = COALESCE(p_function_type, function_type), flags = COALESCE(p_flags, flags), replace_value = COALESCE(p_replace_value, replace_value), enabled = COALESCE(p_enabled, enabled), retain = COALESCE(p_retain, retain), sequence = COALESCE(p_sequence, sequence) WHERE id = p_id RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb, p_global_picklist boolean DEFAULT NULL::boolean) RETURNS dataflow.sources LANGUAGE sql AS $function$ UPDATE dataflow.sources SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields), config = COALESCE(p_config, config), global_picklist = COALESCE(p_global_picklist, global_picklist), updated_at = CURRENT_TIMESTAMP WHERE name = p_name RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb) RETURNS dataflow.sources LANGUAGE sql AS $function$ UPDATE dataflow.sources SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields), config = COALESCE(p_config, config), updated_at = CURRENT_TIMESTAMP WHERE name = p_name RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.update_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT NULL::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT NULL::numeric) RETURNS dataflow.stacks LANGUAGE sql AS $function$ UPDATE dataflow.stacks SET label = COALESCE(p_label, label), fields = COALESCE(p_fields, fields), amount_field = COALESCE(p_amount_field, amount_field), date_field = COALESCE(p_date_field, date_field), balance_offset = COALESCE(p_balance_offset, balance_offset) WHERE name = p_name RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.upsert_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb) RETURNS dataflow.mappings LANGUAGE sql AS $function$ INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) VALUES (p_source_name, p_rule_name, p_input_value, p_output) ON CONFLICT (source_name, rule_name, input_value) DO UPDATE SET output = EXCLUDED.output RETURNING *; $function$ ; CREATE OR REPLACE FUNCTION dataflow.upsert_stack_source(p_stack_name text, p_source_name text, p_field_map jsonb DEFAULT '{}'::jsonb, p_amount_sign integer DEFAULT 1, p_balance_offset numeric DEFAULT 0, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text) RETURNS dataflow.stack_sources LANGUAGE sql AS $function$ INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field, seq) VALUES ( p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field, (SELECT COALESCE(MAX(seq), 0) + 1 FROM dataflow.stack_sources WHERE stack_name = p_stack_name) ) ON CONFLICT (stack_name, source_name) DO UPDATE SET field_map = EXCLUDED.field_map, amount_sign = EXCLUDED.amount_sign, balance_offset = EXCLUDED.balance_offset, amount_field = EXCLUDED.amount_field, date_field = EXCLUDED.date_field RETURNING *; $function$ ; ------------------------------------------------------ -- Summary ------------------------------------------------------ -- All dataflow functions are defined above. -- Deploy with: psql -d dataflow -f database/functions.sql ------------------------------------------------------