dataflow/database/functions.sql
Paul Trowbridge 89a70bdf7e Split transformed column; add override management; show all override keys in panel
- transformed now stores only rule additions (not merged data+overrides)
- View dynamically computes data || transformed || overrides at query time
- New DB functions: set/clear/bulk_set_record_overrides
- Records panel now includes source-wide override keys so party/reason etc.
  appear even on records that don't have them set yet

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 11:00:24 -04:00

1516 lines
54 KiB
PL/PgSQL

--
-- Dataflow Functions
-- Simple, clear functions for import and transformation
--
SET search_path TO dataflow, public;
------------------------------------------------------
-- Function: import_records
-- Import data with automatic deduplication
------------------------------------------------------
CREATE OR REPLACE FUNCTION import_records(
p_source_name TEXT,
p_data JSONB -- Array of records
) RETURNS JSON AS $$
DECLARE
v_constraint_fields TEXT[];
v_inserted INTEGER;
v_duplicates INTEGER;
v_log_id INTEGER;
BEGIN
SELECT constraint_fields INTO v_constraint_fields
FROM dataflow.sources
WHERE name = p_source_name;
IF v_constraint_fields IS NULL THEN
RETURN json_build_object(
'success', false,
'error', 'Source not found: ' || p_source_name
);
END IF;
WITH
-- All incoming records with their constraint keys
pending AS (
SELECT
rec.value AS data,
rec.ordinality AS seq,
(SELECT jsonb_object_agg(f, rec.value->>f)
FROM unnest(v_constraint_fields) AS f) AS constraint_key
FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec
),
-- Keys already in the database (excluded)
existing AS (
SELECT DISTINCT r.constraint_key
FROM dataflow.records r
INNER JOIN pending p ON p.constraint_key = r.constraint_key
WHERE r.source_name = p_source_name
),
-- Rows whose constraint key is not yet in the database
new_records AS (
SELECT p.data, p.constraint_key, p.seq
FROM pending p
WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.constraint_key = p.constraint_key)
),
-- Write the log entry
log_entry AS (
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info)
VALUES (
p_source_name,
(SELECT count(*) FROM new_records),
(SELECT count(*) FROM pending) - (SELECT count(*) FROM new_records),
jsonb_build_object(
'total', jsonb_array_length(p_data),
'inserted_keys', (SELECT jsonb_agg(constraint_key ORDER BY constraint_key) FROM new_records),
'excluded_keys', (SELECT jsonb_agg(constraint_key) FROM existing)
)
)
RETURNING id, records_imported, records_duplicate
),
-- Insert new records
inserted AS (
INSERT INTO dataflow.records (source_name, data, constraint_key, import_id)
SELECT p_source_name, nr.data, nr.constraint_key, (SELECT id FROM log_entry)
FROM new_records nr
ORDER BY nr.seq
RETURNING id
)
SELECT le.id, le.records_imported, le.records_duplicate
INTO v_log_id, v_inserted, v_duplicates
FROM log_entry le;
RETURN json_build_object(
'success', true,
'imported', v_inserted,
'duplicates', v_duplicates,
'log_id', v_log_id
);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
------------------------------------------------------
-- Function: get_import_log
-- Return import history for a source
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT)
RETURNS TABLE (
id INTEGER,
source_name TEXT,
records_imported INTEGER,
records_duplicate INTEGER,
imported_at TIMESTAMPTZ,
info JSONB
) AS $$
SELECT id, source_name, records_imported, records_duplicate, imported_at, info
FROM dataflow.import_log
WHERE source_name = p_source_name
ORDER BY imported_at DESC;
$$ LANGUAGE sql;
COMMENT ON FUNCTION get_import_log IS 'Return import history for a source, newest first, including inserted/excluded key lists';
------------------------------------------------------
-- Function: get_all_import_logs
-- Return import history across all sources
------------------------------------------------------
CREATE OR REPLACE FUNCTION get_all_import_logs()
RETURNS TABLE (
id INTEGER,
source_name TEXT,
records_imported INTEGER,
records_duplicate INTEGER,
imported_at TIMESTAMPTZ,
info JSONB
) AS $$
SELECT id, source_name, records_imported, records_duplicate, imported_at, info
FROM dataflow.import_log
ORDER BY imported_at DESC;
$$ LANGUAGE sql;
COMMENT ON FUNCTION get_all_import_logs IS 'Return import history across all sources, newest first';
------------------------------------------------------
-- Function: delete_import
-- Delete all records from a specific import and remove the log entry
------------------------------------------------------
CREATE OR REPLACE FUNCTION delete_import(p_log_id INTEGER)
RETURNS JSON AS $$
DECLARE
v_deleted INTEGER;
BEGIN
IF NOT EXISTS (SELECT 1 FROM dataflow.import_log WHERE id = p_log_id) THEN
RETURN json_build_object('success', false, 'error', 'Import log entry not found');
END IF;
SELECT count(*) INTO v_deleted FROM dataflow.records WHERE import_id = p_log_id;
-- Cascade handles deleting records via FK ON DELETE CASCADE
DELETE FROM dataflow.import_log WHERE id = p_log_id;
RETURN json_build_object(
'success', true,
'records_deleted', v_deleted,
'log_id', p_log_id
);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION delete_import IS 'Delete all records belonging to an import batch and remove the log entry';
------------------------------------------------------
-- Aggregate: jsonb_concat_obj
-- Merge JSONB objects across rows (later rows win on key conflicts)
-- Usage: jsonb_concat_obj(col ORDER BY sequence)
------------------------------------------------------
CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB)
RETURNS JSONB AS $$
SELECT COALESCE(a, '{}') || COALESCE(b, '{}')
$$ LANGUAGE sql IMMUTABLE;
DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB);
CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) (
sfunc = dataflow.jsonb_merge,
stype = JSONB,
initcond = '{}'
);
------------------------------------------------------
-- Function: apply_transformations
-- Apply all transformation rules to records (set-based)
------------------------------------------------------
DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]);
CREATE OR REPLACE FUNCTION apply_transformations(
p_source_name TEXT,
p_record_ids INTEGER[] DEFAULT NULL, -- NULL = all eligible records
p_overwrite BOOLEAN DEFAULT FALSE -- FALSE = skip already-transformed, TRUE = overwrite all
) RETURNS JSON AS $$
WITH
-- All records to process
qualifying AS (
SELECT id, data
FROM dataflow.records
WHERE source_name = p_source_name
AND (p_overwrite OR transformed IS NULL)
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
),
-- Mirror TPS rx: fan out one row per regex match, drive from rules → records
rx AS (
SELECT
q.id,
r.name AS rule_name,
r.sequence,
r.output_field,
r.retain,
r.function_type,
COALESCE(mt.rn, rp.rn, 1) AS result_number,
-- extract: build map_val and retain_val per match (mirrors TPS)
CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val,
to_jsonb(rp.rp) AS replace_val
FROM dataflow.rules r
INNER JOIN qualifying q ON q.data ? r.field
LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags)
WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract'
LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags)
WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace'
WHERE r.source_name = p_source_name
AND r.enabled = true
),
-- Aggregate match rows back into one value per (record, rule) — mirrors TPS agg_to_target_items
agg_matches AS (
SELECT
id,
rule_name,
sequence,
output_field,
retain,
function_type,
CASE function_type
WHEN 'replace' THEN jsonb_agg(replace_val) -> 0
ELSE
CASE WHEN max(result_number) = 1
THEN jsonb_agg(match_val ORDER BY result_number) -> 0
ELSE jsonb_agg(match_val ORDER BY result_number)
END
END AS extracted
FROM rx
GROUP BY id, rule_name, sequence, output_field, retain, function_type
),
-- Join with mappings to find mapped output — mirrors TPS link_map
linked AS (
SELECT
a.id,
a.sequence,
a.output_field,
a.retain,
a.extracted,
m.output AS mapped
FROM agg_matches a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted
WHERE a.extracted IS NOT NULL
),
-- Build per-rule output JSONB:
-- mapped → use mapping output; also write output_field if retain = true
-- no map → write extracted value to output_field
rule_output AS (
SELECT
id,
sequence,
CASE
WHEN mapped IS NOT NULL THEN
mapped ||
CASE WHEN retain
THEN jsonb_build_object(output_field, extracted)
ELSE '{}'::jsonb
END
ELSE
jsonb_build_object(output_field, extracted)
END AS output
FROM linked
),
-- Merge all rule outputs per record in sequence order — mirrors TPS agg_to_id
record_additions AS (
SELECT
id,
dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions
FROM rule_output
GROUP BY id
),
-- Update all qualifying records; records with no rule matches get transformed = data
updated AS (
UPDATE dataflow.records rec
SET transformed = COALESCE(ra.additions, '{}'::jsonb),
transformed_at = CURRENT_TIMESTAMP
FROM qualifying q
LEFT JOIN record_additions ra ON ra.id = q.id
WHERE rec.id = q.id
RETURNING rec.id
)
SELECT json_build_object('success', true, 'transformed', count(*))
FROM updated
$$ LANGUAGE sql;
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)';
------------------------------------------------------
-- Function: get_all_values
-- All extracted values (mapped + unmapped) with counts and mapping output
------------------------------------------------------
DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT);
CREATE FUNCTION get_all_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB,
mapping_id INTEGER,
output JSONB,
is_mapped BOOLEAN
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed->r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed->r.output_field
ORDER BY rec.id
) AS rn
FROM dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE
rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
),
aggregated AS (
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
)
SELECT
a.rule_name,
a.output_field,
a.source_field,
a.extracted_value,
a.record_count,
a.sample,
m.id AS mapping_id,
m.output,
(m.id IS NOT NULL) AS is_mapped
FROM aggregated a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted_value
ORDER BY a.record_count DESC;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION get_all_values IS 'All extracted values with record counts and mapping output (single query for All tab)';
------------------------------------------------------
-- Function: get_unmapped_values
-- Find extracted values that need mappings
------------------------------------------------------
DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT);
CREATE FUNCTION get_unmapped_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed->r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed->r.output_field
ORDER BY rec.id
) AS rn
FROM
dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE
rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
)
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
WHERE NOT EXISTS (
SELECT 1 FROM dataflow.mappings m
WHERE m.source_name = p_source_name
AND m.rule_name = e.rule_name
AND m.input_value = e.extracted_value
)
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
ORDER BY count(*) DESC;
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
------------------------------------------------------
-- Function: reprocess_records
-- Clear and reapply transformations
------------------------------------------------------
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
RETURNS JSON AS $$
-- Overwrite all records directly — no clear step, mirrors TPS srce_map_overwrite
SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE)
$$ LANGUAGE sql;
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
------------------------------------------------------
-- Function: generate_source_view
-- Build a typed flat view in dfv schema
------------------------------------------------------
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
RETURNS JSON AS $$
DECLARE
v_config JSONB;
v_fields JSONB;
v_field JSONB;
v_cols TEXT := '';
v_sql TEXT;
v_view TEXT;
BEGIN
SELECT config INTO v_config
FROM dataflow.sources
WHERE name = p_source_name;
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
END IF;
v_fields := v_config->'fields';
FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields)
LOOP
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
IF v_field->>'expression' IS NOT NULL THEN
DECLARE
v_expr TEXT := v_field->>'expression';
v_ref TEXT;
v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric');
BEGIN
WHILE v_expr ~ '\{[^}]+\}' LOOP
v_ref := substring(v_expr FROM '\{([^}]+)\}');
v_expr := replace(v_expr, '{' || v_ref || '}',
format('(r->>%L)::numeric', v_ref));
END LOOP;
v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name');
END;
ELSE
CASE v_field->>'type'
WHEN 'date' THEN
v_cols := v_cols || format('(r->>%L)::date AS %I',
v_field->>'name', v_field->>'name');
WHEN 'numeric' THEN
v_cols := v_cols || format('(r->>%L)::numeric AS %I',
v_field->>'name', v_field->>'name');
ELSE
v_cols := v_cols || format('r->>%L AS %I',
v_field->>'name', v_field->>'name');
END CASE;
END IF;
END LOOP;
CREATE SCHEMA IF NOT EXISTS dfv;
v_view := 'dfv.' || quote_ident(p_source_name);
EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view);
v_sql := format(
'CREATE VIEW %s AS SELECT id, %s FROM (SELECT id, data || COALESCE(transformed, ''{}''::jsonb) || COALESCE(overrides, ''{}''::jsonb) AS r FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL) rec',
v_view, v_cols, p_source_name
);
EXECUTE v_sql;
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
END;
$$ LANGUAGE plpgsql;
COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields';
------------------------------------------------------
-- Function: set_record_overrides
-- Save override values for a single record
------------------------------------------------------
DROP FUNCTION IF EXISTS set_record_overrides(INTEGER, JSONB);
CREATE OR REPLACE FUNCTION set_record_overrides(p_id INTEGER, p_overrides JSONB)
RETURNS JSON AS $$
WITH updated AS (
UPDATE dataflow.records
SET overrides = CASE WHEN p_overrides = '{}'::jsonb THEN NULL ELSE p_overrides END
WHERE id = p_id
RETURNING *
)
SELECT row_to_json(updated) FROM updated;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: clear_record_overrides
-- Remove all overrides for a single record
------------------------------------------------------
DROP FUNCTION IF EXISTS clear_record_overrides(INTEGER);
CREATE OR REPLACE FUNCTION clear_record_overrides(p_id INTEGER)
RETURNS JSON AS $$
WITH updated AS (
UPDATE dataflow.records
SET overrides = NULL
WHERE id = p_id
RETURNING *
)
SELECT row_to_json(updated) FROM updated;
$$ LANGUAGE sql;
------------------------------------------------------
-- Function: bulk_set_record_overrides
-- Apply override values to multiple records
------------------------------------------------------
DROP FUNCTION IF EXISTS bulk_set_record_overrides(TEXT, INTEGER[], JSONB);
CREATE OR REPLACE FUNCTION bulk_set_record_overrides(p_source_name TEXT, p_ids INTEGER[], p_overrides JSONB)
RETURNS JSON AS $$
WITH updated AS (
UPDATE dataflow.records
SET overrides = COALESCE(overrides, '{}'::jsonb) || p_overrides
WHERE id = ANY(p_ids)
AND source_name = p_source_name
RETURNING id
)
SELECT json_build_object('updated', count(*)) FROM updated;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION dataflow.calibrate_balance(p_stack_name text, p_source_name text, p_as_of_date date, p_known_balance numeric)
RETURNS json
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
v_src dataflow.stack_sources%ROWTYPE;
v_running NUMERIC;
v_sql TEXT;
BEGIN
SELECT * INTO v_src
FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Source not in stack');
END IF;
IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first');
END IF;
BEGIN
IF p_as_of_date IS NULL THEN
v_sql := format(
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I',
v_src.amount_field, v_src.amount_sign, p_source_name
);
ELSE
v_sql := format(
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date',
v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date
);
END IF;
EXECUTE v_sql INTO v_running;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first');
END;
RETURN json_build_object(
'success', true,
'source', p_source_name,
'as_of_date', p_as_of_date,
'known_balance', p_known_balance,
'computed_sum', v_running,
'suggested_offset', p_known_balance - v_running
);
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.create_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb)
RETURNS dataflow.mappings
LANGUAGE sql
AS $function$
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.create_rule(p_source_name text, p_name text, p_field text, p_pattern text, p_output_field text, p_function_type text DEFAULT 'extract'::text, p_flags text DEFAULT ''::text, p_replace_value text DEFAULT ''::text, p_enabled boolean DEFAULT true, p_retain boolean DEFAULT false, p_sequence integer DEFAULT 0)
RETURNS dataflow.rules
LANGUAGE sql
AS $function$
INSERT INTO dataflow.rules
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence)
VALUES
(p_source_name, p_name, p_field, p_pattern, p_output_field, p_function_type, p_flags, p_replace_value, p_enabled, p_retain, p_sequence)
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb, p_global_picklist boolean DEFAULT true)
RETURNS dataflow.sources
LANGUAGE sql
AS $function$
INSERT INTO dataflow.sources (name, constraint_fields, config, global_picklist)
VALUES (p_name, p_constraint_fields, p_config, p_global_picklist)
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb)
RETURNS dataflow.sources
LANGUAGE sql
AS $function$
INSERT INTO dataflow.sources (name, constraint_fields, config)
VALUES (p_name, p_constraint_fields, p_config)
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.create_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT '[]'::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT 0)
RETURNS dataflow.stacks
LANGUAGE sql
AS $function$
INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_mapping(p_id integer)
RETURNS TABLE(id integer)
LANGUAGE sql
AS $function$
DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_pivot_layout(p_id integer)
RETURNS TABLE(id integer)
LANGUAGE sql
AS $function$
DELETE FROM dataflow.pivot_layouts WHERE id = p_id RETURNING id;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_record(p_id bigint)
RETURNS TABLE(id bigint)
LANGUAGE sql
AS $function$
DELETE FROM dataflow.records WHERE id = p_id RETURNING id;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_rule(p_id integer)
RETURNS TABLE(id integer, name text)
LANGUAGE sql
AS $function$
DELETE FROM dataflow.rules WHERE id = p_id RETURNING id, name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_source(p_name text)
RETURNS text
LANGUAGE sql
AS $function$
DELETE FROM dataflow.sources WHERE name = p_name RETURNING name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_source_records(p_source_name text)
RETURNS TABLE(deleted_count bigint)
LANGUAGE sql
AS $function$
WITH deleted AS (
DELETE FROM dataflow.records WHERE source_name = p_source_name RETURNING id
)
SELECT count(*) AS deleted_count FROM deleted;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.delete_stack(p_name text)
RETURNS TABLE(name text)
LANGUAGE sql
AS $function$
DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.generate_stack_view(p_stack_name text, p_dry_run boolean DEFAULT false)
RETURNS json
LANGUAGE plpgsql
AS $function$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_src dataflow.stack_sources%ROWTYPE;
v_field JSONB;
v_ctes TEXT[] := '{}';
v_cte_names TEXT[] := '{}';
v_select TEXT;
v_col TEXT;
v_src_field TEXT;
v_amt_src TEXT;
v_date_src TEXT;
v_view TEXT;
v_sql TEXT;
v_has_bal BOOLEAN;
v_canon_cols TEXT;
v_src_bal_cols TEXT;
v_total_offset NUMERIC := 0;
v_cascade_stale TEXT[];
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
-- Build one CTE per source querying dfv.{source} directly
FOR v_src IN
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY seq, id
LOOP
v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name);
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
LOOP
v_col := v_field->>'name';
IF v_has_bal AND v_col = v_stack.amount_field THEN
-- Use per-source amount_field with sign applied
IF v_src.amount_field IS NULL THEN
v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col);
ELSE
v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col);
END IF;
ELSIF v_has_bal AND v_col = v_stack.date_field THEN
-- Use per-source date_field
IF v_src.date_field IS NULL THEN
v_select := v_select || format(', NULL::date AS %I', v_col);
ELSE
v_select := v_select || format(', %I AS %I', v_src.date_field, v_col);
END IF;
ELSE
-- Other canonical fields: use field_map or same name, NULL if column doesn't exist
v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
IF EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'dfv'
AND table_name = v_src.source_name
AND column_name = v_src_field
) THEN
v_select := v_select || format(', %I AS %I', v_src_field, v_col);
ELSE
v_select := v_select || format(', NULL::text AS %I', v_col);
END IF;
END IF;
END LOOP;
v_select := v_select || format(' FROM dfv.%I', v_src.source_name);
v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select);
v_cte_names := v_cte_names || quote_ident(v_src.source_name);
-- Accumulate carried-forward source balance column and total offset
IF v_has_bal THEN
IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF;
v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format(
'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I',
v_src.source_name, v_stack.amount_field, v_stack.date_field,
v_src.balance_offset, v_src.source_name || '_balance'
);
v_total_offset := v_total_offset + v_src.balance_offset;
END IF;
END LOOP;
IF array_length(v_ctes, 1) IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
END IF;
v_view := 'dfv.' || quote_ident(p_stack_name);
v_canon_cols := (
SELECT string_agg(quote_ident(f->>'name'), ', ')
FROM jsonb_array_elements(v_stack.fields) f
);
IF v_has_bal THEN
v_sql := format(
'CREATE VIEW %s AS '
'WITH %s, _stacked AS (SELECT * FROM %s) '
'SELECT _source, _id, %s, '
'%s, '
'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance '
'FROM _stacked ORDER BY %I DESC, _id DESC',
v_view,
array_to_string(v_ctes, ', '),
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
v_canon_cols,
v_src_bal_cols,
v_stack.amount_field,
v_stack.date_field,
v_total_offset,
v_stack.date_field
);
ELSE
v_sql := format(
'CREATE VIEW %s AS '
'WITH %s, _stacked AS (SELECT * FROM %s) '
'SELECT _source, _id, %s FROM _stacked',
v_view,
array_to_string(v_ctes, ', '),
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
v_canon_cols
);
END IF;
IF NOT p_dry_run THEN
CREATE SCHEMA IF NOT EXISTS dfv;
EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view);
EXECUTE v_sql;
-- Detect stacks whose views were dropped by CASCADE and mark them stale
SELECT array_agg(s.name) INTO v_cascade_stale
FROM dataflow.stacks s
WHERE s.name != p_stack_name
AND s.view_generated_at IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM pg_views v
WHERE v.schemaname = 'dfv' AND v.viewname = s.name
);
UPDATE dataflow.stacks SET view_generated_at = NULL
WHERE name = ANY(v_cascade_stale);
END IF;
RETURN json_build_object(
'success', true,
'view', v_view,
'sql', v_sql,
'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json)
);
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_global_output_values()
RETURNS TABLE(col text, val text)
LANGUAGE sql
STABLE
AS $function$
SELECT DISTINCT e.key AS col, e.value AS val
FROM dataflow.mappings m
JOIN dataflow.sources s ON s.name = m.source_name
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
WHERE s.global_picklist = true
AND e.value IS NOT NULL
AND e.value <> ''
ORDER BY e.key, e.value;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_mapping(p_id integer)
RETURNS dataflow.mappings
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.mappings WHERE id = p_id;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_mapping_counts(p_source_name text, p_rule_name text DEFAULT NULL::text)
RETURNS TABLE(rule_name text, input_value jsonb, record_count bigint)
LANGUAGE sql
STABLE
AS $function$
SELECT
m.rule_name,
m.input_value,
COUNT(rec.id) AS record_count
FROM dataflow.mappings m
JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name
LEFT JOIN dataflow.records rec ON
rec.source_name = m.source_name
AND rec.transformed ? r.output_field
AND rec.transformed -> r.output_field = m.input_value
WHERE m.source_name = p_source_name
AND (p_rule_name IS NULL OR m.rule_name = p_rule_name)
GROUP BY m.rule_name, m.input_value;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_mappings_by_output_field(p_col text, p_val text)
RETURNS TABLE(id integer, source_name text, rule_name text, input_value jsonb, output jsonb)
LANGUAGE sql
STABLE
AS $function$
SELECT m.id, m.source_name, m.rule_name, m.input_value, m.output
FROM dataflow.mappings m
WHERE m.output->>(p_col) = p_val
ORDER BY m.source_name, m.rule_name, m.input_value::text;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_record(p_id bigint)
RETURNS dataflow.records
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.records WHERE id = p_id;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_rule(p_id integer)
RETURNS dataflow.rules
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.rules WHERE id = p_id;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_source(p_name text)
RETURNS dataflow.sources
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.sources WHERE name = p_name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_source_fields(p_source_name text)
RETURNS TABLE(key text, origins text[])
LANGUAGE sql
STABLE
AS $function$
SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins
FROM (
SELECT f->>'name' AS key, 'schema' AS origin
FROM dataflow.sources, jsonb_array_elements(config->'fields') f
WHERE name = p_source_name AND config ? 'fields'
UNION ALL
SELECT jsonb_object_keys(data) AS key, 'raw' AS origin
FROM dataflow.records WHERE source_name = p_source_name
UNION ALL
SELECT output_field AS key, 'rule: ' || name AS origin
FROM dataflow.rules WHERE source_name = p_source_name
UNION ALL
SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin
FROM dataflow.mappings WHERE source_name = p_source_name
) keys
GROUP BY key
ORDER BY key;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_source_stats(p_source_name text)
RETURNS TABLE(total_records bigint, transformed_records bigint, pending_records bigint)
LANGUAGE sql
STABLE
AS $function$
SELECT
COUNT(*) AS total_records,
COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records,
COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records
FROM dataflow.records
WHERE source_name = p_source_name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_stack(p_name text)
RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, created_at timestamp with time zone, sources jsonb)
LANGUAGE sql
STABLE
AS $function$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
s.created_at,
COALESCE(jsonb_agg(
jsonb_build_object(
'id', ss.id,
'source_name', ss.source_name,
'field_map', ss.field_map,
'amount_field', ss.amount_field,
'amount_sign', ss.amount_sign,
'date_field', ss.date_field,
'balance_offset', ss.balance_offset,
'seq', ss.seq
) ORDER BY ss.seq, ss.id
) FILTER (WHERE ss.id IS NOT NULL), '[]')
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
WHERE s.name = p_name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_stack_balance(p_stack_name text)
RETURNS json
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
v_stack dataflow.stacks%ROWTYPE;
v_balance NUMERIC;
v_view TEXT;
v_sql TEXT;
BEGIN
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
IF NOT FOUND THEN
RETURN json_build_object('success', false, 'error', 'Stack not found');
END IF;
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set');
END IF;
v_view := 'dfv.' || quote_ident(p_stack_name);
BEGIN
v_sql := format(
'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
v_view, v_stack.date_field
);
EXECUTE v_sql INTO v_balance;
EXCEPTION WHEN undefined_table THEN
RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first');
END;
RETURN json_build_object('success', true, 'balance', v_balance);
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_status()
RETURNS json
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
v_sources JSON;
v_stacks JSON;
BEGIN
SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json)
INTO v_sources
FROM dataflow.sources
WHERE view_generated_at IS NULL;
SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json)
INTO v_stacks
FROM dataflow.stacks
WHERE view_generated_at IS NULL;
RETURN json_build_object('stale_sources', v_sources, 'stale_stacks', v_stacks);
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.get_view_data(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_sort_col text DEFAULT NULL::text, p_sort_dir text DEFAULT 'asc'::text, p_filters jsonb DEFAULT NULL::jsonb)
RETURNS json
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
v_exists BOOLEAN;
v_where TEXT := '';
v_order TEXT := '';
v_rows JSON;
v_filter JSONB;
v_col TEXT;
v_pattern TEXT;
BEGIN
SELECT EXISTS (
SELECT 1 FROM information_schema.views
WHERE table_schema = 'dfv' AND table_name = p_source_name
) INTO v_exists;
IF NOT v_exists THEN
RETURN json_build_object('exists', FALSE, 'rows', '[]'::json);
END IF;
-- Build WHERE from filters (validate each column exists in the view)
IF p_filters IS NOT NULL THEN
FOR v_filter IN SELECT value FROM jsonb_array_elements(p_filters) LOOP
v_col := v_filter->>'col';
v_pattern := v_filter->>'pattern';
IF v_pattern IS NOT NULL AND v_pattern <> '' AND EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'dfv'
AND table_name = p_source_name
AND column_name = v_col
) THEN
v_where := v_where ||
CASE WHEN v_where = '' THEN ' WHERE ' ELSE ' AND ' END ||
quote_ident(v_col) || '::text ~* ' || quote_literal(v_pattern);
END IF;
END LOOP;
END IF;
IF p_sort_col IS NOT NULL AND EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'dfv'
AND table_name = p_source_name
AND column_name = p_sort_col
) THEN
v_order := ' ORDER BY ' || quote_ident(p_sort_col)
|| CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END
|| ' NULLS LAST';
END IF;
EXECUTE format(
'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s%s LIMIT %s OFFSET %s) t',
p_source_name, v_where, v_order, p_limit, p_offset
) INTO v_rows;
RETURN json_build_object('exists', TRUE, 'rows', v_rows);
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.list_mappings(p_source_name text, p_rule_name text DEFAULT NULL::text)
RETURNS SETOF dataflow.mappings
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.mappings
WHERE source_name = p_source_name
AND (p_rule_name IS NULL OR rule_name = p_rule_name)
ORDER BY rule_name, input_value::text;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.list_pivot_layouts(p_source_name text)
RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone)
LANGUAGE sql
AS $function$
SELECT id, source_name, layout_name, config, created_at
FROM dataflow.pivot_layouts
WHERE source_name = p_source_name
ORDER BY layout_name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.list_records(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_transformed_only boolean DEFAULT false)
RETURNS SETOF dataflow.records
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.records
WHERE source_name = p_source_name
AND (NOT p_transformed_only OR transformed IS NOT NULL)
ORDER BY id DESC
LIMIT p_limit OFFSET p_offset;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.list_rules(p_source_name text)
RETURNS SETOF dataflow.rules
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.rules
WHERE source_name = p_source_name
ORDER BY sequence, name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.list_sources()
RETURNS SETOF dataflow.sources
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.sources ORDER BY name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.list_stacks()
RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, source_count bigint, created_at timestamp with time zone)
LANGUAGE sql
STABLE
AS $function$
SELECT
s.name, s.label, s.fields,
s.amount_field, s.date_field, s.balance_offset,
count(ss.id) AS source_count,
s.created_at
FROM dataflow.stacks s
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
ORDER BY s.name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.preview_rule(p_source text, p_field text, p_pattern text, p_flags text DEFAULT ''::text, p_function_type text DEFAULT 'extract'::text, p_replace_value text DEFAULT ''::text, p_limit integer DEFAULT 20)
RETURNS TABLE(id integer, raw_value text, extracted_value jsonb)
LANGUAGE plpgsql
STABLE
AS $function$
-- Field is resolved from data first, then transformed (supports chained rules whose
-- input field was produced by an earlier-sequence rule rather than the raw import).
BEGIN
IF p_function_type = 'replace' THEN
RETURN QUERY
SELECT
r.id,
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
to_jsonb(regexp_replace(
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
p_pattern, p_replace_value, p_flags
))
FROM dataflow.records r
WHERE source_name = p_source
AND (data ? p_field OR transformed ? p_field)
ORDER BY r.id DESC LIMIT p_limit;
ELSE
RETURN QUERY
SELECT
r.id,
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
CASE
WHEN agg.match_count = 0 THEN NULL
WHEN agg.match_count = 1 THEN agg.matches -> 0
ELSE agg.matches
END
FROM dataflow.records r
CROSS JOIN LATERAL (
SELECT
jsonb_agg(
CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) ELSE to_jsonb(mt) END
ORDER BY rn
) AS matches,
count(*)::int AS match_count
FROM regexp_matches(
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
p_pattern, p_flags
)
WITH ORDINALITY AS m(mt, rn)
) agg
WHERE r.source_name = p_source
AND (r.data ? p_field OR r.transformed ? p_field)
ORDER BY r.id DESC LIMIT p_limit;
END IF;
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.remap_output_field(p_col text, p_from_val text, p_to_val text)
RETURNS integer
LANGUAGE plpgsql
AS $function$
DECLARE
updated_count INTEGER;
BEGIN
UPDATE dataflow.mappings
SET output = jsonb_set(output, ARRAY[p_col], to_jsonb(p_to_val))
WHERE output->>(p_col) = p_from_val;
GET DIAGNOSTICS updated_count = ROW_COUNT;
RETURN updated_count;
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.remove_stack_source(p_stack_name text, p_source_name text)
RETURNS TABLE(source_name text)
LANGUAGE sql
AS $function$
DELETE FROM dataflow.stack_sources
WHERE stack_name = p_stack_name AND source_name = p_source_name
RETURNING source_name;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.reorder_stack_sources(p_stack_name text, p_source_names text[])
RETURNS void
LANGUAGE plpgsql
AS $function$
DECLARE
i INTEGER;
BEGIN
FOR i IN 1..array_length(p_source_names, 1) LOOP
UPDATE dataflow.stack_sources
SET seq = i
WHERE stack_name = p_stack_name AND source_name = p_source_names[i];
END LOOP;
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.save_pivot_layout(p_source_name text, p_layout_name text, p_config jsonb)
RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone)
LANGUAGE sql
AS $function$
INSERT INTO dataflow.pivot_layouts (source_name, layout_name, config)
VALUES (p_source_name, p_layout_name, p_config)
ON CONFLICT (source_name, layout_name) DO UPDATE
SET config = EXCLUDED.config
RETURNING id, source_name, layout_name, config, created_at;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.search_mapping_outputs(p_search text)
RETURNS TABLE(col text, val text, mapping_count bigint)
LANGUAGE sql
STABLE
AS $function$
SELECT e.key AS col, e.value AS val, COUNT(*) AS mapping_count
FROM dataflow.mappings m
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
WHERE e.value ILIKE '%' || p_search || '%'
AND e.value IS NOT NULL
AND e.value <> ''
GROUP BY e.key, e.value
ORDER BY e.key, e.value;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.search_records(p_source_name text, p_query jsonb, p_limit integer DEFAULT 100)
RETURNS SETOF dataflow.records
LANGUAGE sql
STABLE
AS $function$
SELECT * FROM dataflow.records
WHERE source_name = p_source_name
AND (data @> p_query OR transformed @> p_query)
ORDER BY id DESC
LIMIT p_limit;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.source_config_changed()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF NEW.config IS DISTINCT FROM OLD.config THEN
NEW.view_generated_at := NULL;
END IF;
RETURN NEW;
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.stack_sources_changed()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF TG_OP = 'UPDATE' THEN
IF NEW.field_map IS NOT DISTINCT FROM OLD.field_map AND
NEW.amount_sign IS NOT DISTINCT FROM OLD.amount_sign AND
NEW.balance_offset IS NOT DISTINCT FROM OLD.balance_offset AND
NEW.amount_field IS NOT DISTINCT FROM OLD.amount_field AND
NEW.date_field IS NOT DISTINCT FROM OLD.date_field AND
NEW.seq IS NOT DISTINCT FROM OLD.seq THEN
RETURN NULL;
END IF;
END IF;
UPDATE dataflow.stacks SET view_generated_at = NULL
WHERE name = COALESCE(NEW.stack_name, OLD.stack_name);
RETURN NULL;
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.test_rule(p_rule_id integer, p_limit integer DEFAULT 20)
RETURNS TABLE(rule jsonb, results jsonb)
LANGUAGE plpgsql
STABLE
AS $function$
DECLARE
v_rule dataflow.rules%ROWTYPE;
v_results JSONB;
BEGIN
SELECT * INTO v_rule FROM dataflow.rules WHERE id = p_rule_id;
IF NOT FOUND THEN RETURN; END IF;
SELECT jsonb_agg(row_to_json(t)) INTO v_results FROM (
SELECT
r.id,
r.data ->> v_rule.field AS raw_value,
CASE
WHEN agg.match_count = 0 THEN NULL
WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1
THEN to_jsonb(agg.matches[1][1])
WHEN agg.match_count = 1
THEN to_jsonb(agg.matches[1])
WHEN array_length(agg.matches[1], 1) = 1
THEN (SELECT jsonb_agg(m[1] ORDER BY idx) FROM unnest(agg.matches) WITH ORDINALITY u(m, idx))
ELSE to_jsonb(agg.matches)
END AS extracted_value
FROM dataflow.records r
CROSS JOIN LATERAL (
SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count
FROM regexp_matches(r.data ->> v_rule.field, v_rule.pattern, COALESCE(v_rule.flags, ''))
WITH ORDINALITY AS m(mt, rn)
) agg
WHERE r.source_name = v_rule.source_name AND r.data ? v_rule.field
ORDER BY r.id DESC LIMIT p_limit
) t;
RETURN QUERY SELECT
jsonb_build_object(
'id', v_rule.id, 'name', v_rule.name,
'field', v_rule.field, 'pattern', v_rule.pattern,
'output_field', v_rule.output_field
),
COALESCE(v_results, '[]'::jsonb);
END;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.update_mapping(p_id integer, p_input_value jsonb DEFAULT NULL::jsonb, p_output jsonb DEFAULT NULL::jsonb)
RETURNS dataflow.mappings
LANGUAGE sql
AS $function$
UPDATE dataflow.mappings SET
input_value = COALESCE(p_input_value, input_value),
output = COALESCE(p_output, output)
WHERE id = p_id
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.update_rule(p_id integer, p_name text DEFAULT NULL::text, p_field text DEFAULT NULL::text, p_pattern text DEFAULT NULL::text, p_output_field text DEFAULT NULL::text, p_function_type text DEFAULT NULL::text, p_flags text DEFAULT NULL::text, p_replace_value text DEFAULT NULL::text, p_enabled boolean DEFAULT NULL::boolean, p_retain boolean DEFAULT NULL::boolean, p_sequence integer DEFAULT NULL::integer)
RETURNS dataflow.rules
LANGUAGE sql
AS $function$
UPDATE dataflow.rules SET
name = COALESCE(p_name, name),
field = COALESCE(p_field, field),
pattern = COALESCE(p_pattern, pattern),
output_field = COALESCE(p_output_field, output_field),
function_type = COALESCE(p_function_type, function_type),
flags = COALESCE(p_flags, flags),
replace_value = COALESCE(p_replace_value, replace_value),
enabled = COALESCE(p_enabled, enabled),
retain = COALESCE(p_retain, retain),
sequence = COALESCE(p_sequence, sequence)
WHERE id = p_id
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb, p_global_picklist boolean DEFAULT NULL::boolean)
RETURNS dataflow.sources
LANGUAGE sql
AS $function$
UPDATE dataflow.sources
SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields),
config = COALESCE(p_config, config),
global_picklist = COALESCE(p_global_picklist, global_picklist),
updated_at = CURRENT_TIMESTAMP
WHERE name = p_name
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb)
RETURNS dataflow.sources
LANGUAGE sql
AS $function$
UPDATE dataflow.sources
SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields),
config = COALESCE(p_config, config),
updated_at = CURRENT_TIMESTAMP
WHERE name = p_name
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.update_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT NULL::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT NULL::numeric)
RETURNS dataflow.stacks
LANGUAGE sql
AS $function$
UPDATE dataflow.stacks SET
label = COALESCE(p_label, label),
fields = COALESCE(p_fields, fields),
amount_field = COALESCE(p_amount_field, amount_field),
date_field = COALESCE(p_date_field, date_field),
balance_offset = COALESCE(p_balance_offset, balance_offset)
WHERE name = p_name
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.upsert_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb)
RETURNS dataflow.mappings
LANGUAGE sql
AS $function$
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
ON CONFLICT (source_name, rule_name, input_value)
DO UPDATE SET output = EXCLUDED.output
RETURNING *;
$function$
;
CREATE OR REPLACE FUNCTION dataflow.upsert_stack_source(p_stack_name text, p_source_name text, p_field_map jsonb DEFAULT '{}'::jsonb, p_amount_sign integer DEFAULT 1, p_balance_offset numeric DEFAULT 0, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text)
RETURNS dataflow.stack_sources
LANGUAGE sql
AS $function$
INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field, seq)
VALUES (
p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field,
(SELECT COALESCE(MAX(seq), 0) + 1 FROM dataflow.stack_sources WHERE stack_name = p_stack_name)
)
ON CONFLICT (stack_name, source_name) DO UPDATE SET
field_map = EXCLUDED.field_map,
amount_sign = EXCLUDED.amount_sign,
balance_offset = EXCLUDED.balance_offset,
amount_field = EXCLUDED.amount_field,
date_field = EXCLUDED.date_field
RETURNING *;
$function$
;
------------------------------------------------------
-- Summary
------------------------------------------------------
-- All dataflow functions are defined above.
-- Deploy with: psql -d dataflow -f database/functions.sql
------------------------------------------------------