- transformed now stores only rule additions (not merged data+overrides) - View dynamically computes data || transformed || overrides at query time - New DB functions: set/clear/bulk_set_record_overrides - Records panel now includes source-wide override keys so party/reason etc. appear even on records that don't have them set yet Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1516 lines
54 KiB
PL/PgSQL
1516 lines
54 KiB
PL/PgSQL
--
|
|
-- Dataflow Functions
|
|
-- Simple, clear functions for import and transformation
|
|
--
|
|
|
|
SET search_path TO dataflow, public;
|
|
|
|
------------------------------------------------------
|
|
-- Function: import_records
|
|
-- Import data with automatic deduplication
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION import_records(
|
|
p_source_name TEXT,
|
|
p_data JSONB -- Array of records
|
|
) RETURNS JSON AS $$
|
|
DECLARE
|
|
v_constraint_fields TEXT[];
|
|
v_inserted INTEGER;
|
|
v_duplicates INTEGER;
|
|
v_log_id INTEGER;
|
|
BEGIN
|
|
SELECT constraint_fields INTO v_constraint_fields
|
|
FROM dataflow.sources
|
|
WHERE name = p_source_name;
|
|
|
|
IF v_constraint_fields IS NULL THEN
|
|
RETURN json_build_object(
|
|
'success', false,
|
|
'error', 'Source not found: ' || p_source_name
|
|
);
|
|
END IF;
|
|
|
|
WITH
|
|
-- All incoming records with their constraint keys
|
|
pending AS (
|
|
SELECT
|
|
rec.value AS data,
|
|
rec.ordinality AS seq,
|
|
(SELECT jsonb_object_agg(f, rec.value->>f)
|
|
FROM unnest(v_constraint_fields) AS f) AS constraint_key
|
|
FROM jsonb_array_elements(p_data) WITH ORDINALITY AS rec
|
|
),
|
|
-- Keys already in the database (excluded)
|
|
existing AS (
|
|
SELECT DISTINCT r.constraint_key
|
|
FROM dataflow.records r
|
|
INNER JOIN pending p ON p.constraint_key = r.constraint_key
|
|
WHERE r.source_name = p_source_name
|
|
),
|
|
-- Rows whose constraint key is not yet in the database
|
|
new_records AS (
|
|
SELECT p.data, p.constraint_key, p.seq
|
|
FROM pending p
|
|
WHERE NOT EXISTS (SELECT 1 FROM existing e WHERE e.constraint_key = p.constraint_key)
|
|
),
|
|
-- Write the log entry
|
|
log_entry AS (
|
|
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate, info)
|
|
VALUES (
|
|
p_source_name,
|
|
(SELECT count(*) FROM new_records),
|
|
(SELECT count(*) FROM pending) - (SELECT count(*) FROM new_records),
|
|
jsonb_build_object(
|
|
'total', jsonb_array_length(p_data),
|
|
'inserted_keys', (SELECT jsonb_agg(constraint_key ORDER BY constraint_key) FROM new_records),
|
|
'excluded_keys', (SELECT jsonb_agg(constraint_key) FROM existing)
|
|
)
|
|
)
|
|
RETURNING id, records_imported, records_duplicate
|
|
),
|
|
-- Insert new records
|
|
inserted AS (
|
|
INSERT INTO dataflow.records (source_name, data, constraint_key, import_id)
|
|
SELECT p_source_name, nr.data, nr.constraint_key, (SELECT id FROM log_entry)
|
|
FROM new_records nr
|
|
ORDER BY nr.seq
|
|
RETURNING id
|
|
)
|
|
SELECT le.id, le.records_imported, le.records_duplicate
|
|
INTO v_log_id, v_inserted, v_duplicates
|
|
FROM log_entry le;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'imported', v_inserted,
|
|
'duplicates', v_duplicates,
|
|
'log_id', v_log_id
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_import_log
|
|
-- Return import history for a source
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT)
|
|
RETURNS TABLE (
|
|
id INTEGER,
|
|
source_name TEXT,
|
|
records_imported INTEGER,
|
|
records_duplicate INTEGER,
|
|
imported_at TIMESTAMPTZ,
|
|
info JSONB
|
|
) AS $$
|
|
SELECT id, source_name, records_imported, records_duplicate, imported_at, info
|
|
FROM dataflow.import_log
|
|
WHERE source_name = p_source_name
|
|
ORDER BY imported_at DESC;
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION get_import_log IS 'Return import history for a source, newest first, including inserted/excluded key lists';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_all_import_logs
|
|
-- Return import history across all sources
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION get_all_import_logs()
|
|
RETURNS TABLE (
|
|
id INTEGER,
|
|
source_name TEXT,
|
|
records_imported INTEGER,
|
|
records_duplicate INTEGER,
|
|
imported_at TIMESTAMPTZ,
|
|
info JSONB
|
|
) AS $$
|
|
SELECT id, source_name, records_imported, records_duplicate, imported_at, info
|
|
FROM dataflow.import_log
|
|
ORDER BY imported_at DESC;
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION get_all_import_logs IS 'Return import history across all sources, newest first';
|
|
|
|
------------------------------------------------------
|
|
-- Function: delete_import
|
|
-- Delete all records from a specific import and remove the log entry
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION delete_import(p_log_id INTEGER)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_deleted INTEGER;
|
|
BEGIN
|
|
IF NOT EXISTS (SELECT 1 FROM dataflow.import_log WHERE id = p_log_id) THEN
|
|
RETURN json_build_object('success', false, 'error', 'Import log entry not found');
|
|
END IF;
|
|
|
|
SELECT count(*) INTO v_deleted FROM dataflow.records WHERE import_id = p_log_id;
|
|
|
|
-- Cascade handles deleting records via FK ON DELETE CASCADE
|
|
DELETE FROM dataflow.import_log WHERE id = p_log_id;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'records_deleted', v_deleted,
|
|
'log_id', p_log_id
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION delete_import IS 'Delete all records belonging to an import batch and remove the log entry';
|
|
|
|
------------------------------------------------------
|
|
-- Aggregate: jsonb_concat_obj
|
|
-- Merge JSONB objects across rows (later rows win on key conflicts)
|
|
-- Usage: jsonb_concat_obj(col ORDER BY sequence)
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB)
|
|
RETURNS JSONB AS $$
|
|
SELECT COALESCE(a, '{}') || COALESCE(b, '{}')
|
|
$$ LANGUAGE sql IMMUTABLE;
|
|
|
|
DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB);
|
|
CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) (
|
|
sfunc = dataflow.jsonb_merge,
|
|
stype = JSONB,
|
|
initcond = '{}'
|
|
);
|
|
|
|
------------------------------------------------------
|
|
-- Function: apply_transformations
|
|
-- Apply all transformation rules to records (set-based)
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]);
|
|
CREATE OR REPLACE FUNCTION apply_transformations(
|
|
p_source_name TEXT,
|
|
p_record_ids INTEGER[] DEFAULT NULL, -- NULL = all eligible records
|
|
p_overwrite BOOLEAN DEFAULT FALSE -- FALSE = skip already-transformed, TRUE = overwrite all
|
|
) RETURNS JSON AS $$
|
|
WITH
|
|
-- All records to process
|
|
qualifying AS (
|
|
SELECT id, data
|
|
FROM dataflow.records
|
|
WHERE source_name = p_source_name
|
|
AND (p_overwrite OR transformed IS NULL)
|
|
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
|
|
),
|
|
-- Mirror TPS rx: fan out one row per regex match, drive from rules → records
|
|
rx AS (
|
|
SELECT
|
|
q.id,
|
|
r.name AS rule_name,
|
|
r.sequence,
|
|
r.output_field,
|
|
r.retain,
|
|
r.function_type,
|
|
COALESCE(mt.rn, rp.rn, 1) AS result_number,
|
|
-- extract: build map_val and retain_val per match (mirrors TPS)
|
|
CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val,
|
|
to_jsonb(rp.rp) AS replace_val
|
|
FROM dataflow.rules r
|
|
INNER JOIN qualifying q ON q.data ? r.field
|
|
LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags)
|
|
WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract'
|
|
LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags)
|
|
WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace'
|
|
WHERE r.source_name = p_source_name
|
|
AND r.enabled = true
|
|
),
|
|
-- Aggregate match rows back into one value per (record, rule) — mirrors TPS agg_to_target_items
|
|
agg_matches AS (
|
|
SELECT
|
|
id,
|
|
rule_name,
|
|
sequence,
|
|
output_field,
|
|
retain,
|
|
function_type,
|
|
CASE function_type
|
|
WHEN 'replace' THEN jsonb_agg(replace_val) -> 0
|
|
ELSE
|
|
CASE WHEN max(result_number) = 1
|
|
THEN jsonb_agg(match_val ORDER BY result_number) -> 0
|
|
ELSE jsonb_agg(match_val ORDER BY result_number)
|
|
END
|
|
END AS extracted
|
|
FROM rx
|
|
GROUP BY id, rule_name, sequence, output_field, retain, function_type
|
|
),
|
|
-- Join with mappings to find mapped output — mirrors TPS link_map
|
|
linked AS (
|
|
SELECT
|
|
a.id,
|
|
a.sequence,
|
|
a.output_field,
|
|
a.retain,
|
|
a.extracted,
|
|
m.output AS mapped
|
|
FROM agg_matches a
|
|
LEFT JOIN dataflow.mappings m ON
|
|
m.source_name = p_source_name
|
|
AND m.rule_name = a.rule_name
|
|
AND m.input_value = a.extracted
|
|
WHERE a.extracted IS NOT NULL
|
|
),
|
|
-- Build per-rule output JSONB:
|
|
-- mapped → use mapping output; also write output_field if retain = true
|
|
-- no map → write extracted value to output_field
|
|
rule_output AS (
|
|
SELECT
|
|
id,
|
|
sequence,
|
|
CASE
|
|
WHEN mapped IS NOT NULL THEN
|
|
mapped ||
|
|
CASE WHEN retain
|
|
THEN jsonb_build_object(output_field, extracted)
|
|
ELSE '{}'::jsonb
|
|
END
|
|
ELSE
|
|
jsonb_build_object(output_field, extracted)
|
|
END AS output
|
|
FROM linked
|
|
),
|
|
-- Merge all rule outputs per record in sequence order — mirrors TPS agg_to_id
|
|
record_additions AS (
|
|
SELECT
|
|
id,
|
|
dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions
|
|
FROM rule_output
|
|
GROUP BY id
|
|
),
|
|
-- Update all qualifying records; records with no rule matches get transformed = data
|
|
updated AS (
|
|
UPDATE dataflow.records rec
|
|
SET transformed = COALESCE(ra.additions, '{}'::jsonb),
|
|
transformed_at = CURRENT_TIMESTAMP
|
|
FROM qualifying q
|
|
LEFT JOIN record_additions ra ON ra.id = q.id
|
|
WHERE rec.id = q.id
|
|
RETURNING rec.id
|
|
)
|
|
SELECT json_build_object('success', true, 'transformed', count(*))
|
|
FROM updated
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records (set-based CTE)';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_all_values
|
|
-- All extracted values (mapped + unmapped) with counts and mapping output
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT);
|
|
CREATE FUNCTION get_all_values(
|
|
p_source_name TEXT,
|
|
p_rule_name TEXT DEFAULT NULL
|
|
) RETURNS TABLE (
|
|
rule_name TEXT,
|
|
output_field TEXT,
|
|
source_field TEXT,
|
|
extracted_value JSONB,
|
|
record_count BIGINT,
|
|
sample JSONB,
|
|
mapping_id INTEGER,
|
|
output JSONB,
|
|
is_mapped BOOLEAN
|
|
) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
WITH extracted AS (
|
|
SELECT
|
|
r.name AS rule_name,
|
|
r.output_field,
|
|
r.field AS source_field,
|
|
rec.transformed->r.output_field AS extracted_value,
|
|
rec.data AS record_data,
|
|
row_number() OVER (
|
|
PARTITION BY r.name, rec.transformed->r.output_field
|
|
ORDER BY rec.id
|
|
) AS rn
|
|
FROM dataflow.records rec
|
|
CROSS JOIN dataflow.rules r
|
|
WHERE
|
|
rec.source_name = p_source_name
|
|
AND r.source_name = p_source_name
|
|
AND rec.transformed IS NOT NULL
|
|
AND rec.transformed ? r.output_field
|
|
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
|
AND rec.data ? r.field
|
|
),
|
|
aggregated AS (
|
|
SELECT
|
|
e.rule_name,
|
|
e.output_field,
|
|
e.source_field,
|
|
e.extracted_value,
|
|
count(*) AS record_count,
|
|
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
|
|
FROM extracted e
|
|
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
|
|
)
|
|
SELECT
|
|
a.rule_name,
|
|
a.output_field,
|
|
a.source_field,
|
|
a.extracted_value,
|
|
a.record_count,
|
|
a.sample,
|
|
m.id AS mapping_id,
|
|
m.output,
|
|
(m.id IS NOT NULL) AS is_mapped
|
|
FROM aggregated a
|
|
LEFT JOIN dataflow.mappings m ON
|
|
m.source_name = p_source_name
|
|
AND m.rule_name = a.rule_name
|
|
AND m.input_value = a.extracted_value
|
|
ORDER BY a.record_count DESC;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION get_all_values IS 'All extracted values with record counts and mapping output (single query for All tab)';
|
|
|
|
------------------------------------------------------
|
|
-- Function: get_unmapped_values
|
|
-- Find extracted values that need mappings
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT);
|
|
CREATE FUNCTION get_unmapped_values(
|
|
p_source_name TEXT,
|
|
p_rule_name TEXT DEFAULT NULL
|
|
) RETURNS TABLE (
|
|
rule_name TEXT,
|
|
output_field TEXT,
|
|
source_field TEXT,
|
|
extracted_value JSONB,
|
|
record_count BIGINT,
|
|
sample JSONB
|
|
) AS $$
|
|
BEGIN
|
|
RETURN QUERY
|
|
WITH extracted AS (
|
|
SELECT
|
|
r.name AS rule_name,
|
|
r.output_field,
|
|
r.field AS source_field,
|
|
rec.transformed->r.output_field AS extracted_value,
|
|
rec.data AS record_data,
|
|
row_number() OVER (
|
|
PARTITION BY r.name, rec.transformed->r.output_field
|
|
ORDER BY rec.id
|
|
) AS rn
|
|
FROM
|
|
dataflow.records rec
|
|
CROSS JOIN dataflow.rules r
|
|
WHERE
|
|
rec.source_name = p_source_name
|
|
AND r.source_name = p_source_name
|
|
AND rec.transformed IS NOT NULL
|
|
AND rec.transformed ? r.output_field
|
|
AND (p_rule_name IS NULL OR r.name = p_rule_name)
|
|
AND rec.data ? r.field
|
|
)
|
|
SELECT
|
|
e.rule_name,
|
|
e.output_field,
|
|
e.source_field,
|
|
e.extracted_value,
|
|
count(*) AS record_count,
|
|
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
|
|
FROM extracted e
|
|
WHERE NOT EXISTS (
|
|
SELECT 1 FROM dataflow.mappings m
|
|
WHERE m.source_name = p_source_name
|
|
AND m.rule_name = e.rule_name
|
|
AND m.input_value = e.extracted_value
|
|
)
|
|
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
|
|
ORDER BY count(*) DESC;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined';
|
|
|
|
------------------------------------------------------
|
|
-- Function: reprocess_records
|
|
-- Clear and reapply transformations
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
|
|
RETURNS JSON AS $$
|
|
-- Overwrite all records directly — no clear step, mirrors TPS srce_map_overwrite
|
|
SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE)
|
|
$$ LANGUAGE sql;
|
|
|
|
COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source';
|
|
|
|
------------------------------------------------------
|
|
-- Function: generate_source_view
|
|
-- Build a typed flat view in dfv schema
|
|
------------------------------------------------------
|
|
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
|
|
RETURNS JSON AS $$
|
|
DECLARE
|
|
v_config JSONB;
|
|
v_fields JSONB;
|
|
v_field JSONB;
|
|
v_cols TEXT := '';
|
|
v_sql TEXT;
|
|
v_view TEXT;
|
|
BEGIN
|
|
SELECT config INTO v_config
|
|
FROM dataflow.sources
|
|
WHERE name = p_source_name;
|
|
|
|
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
|
|
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
|
|
END IF;
|
|
|
|
v_fields := v_config->'fields';
|
|
|
|
FOR v_field IN SELECT * FROM jsonb_array_elements(v_fields)
|
|
LOOP
|
|
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
|
|
|
|
IF v_field->>'expression' IS NOT NULL THEN
|
|
DECLARE
|
|
v_expr TEXT := v_field->>'expression';
|
|
v_ref TEXT;
|
|
v_cast TEXT := COALESCE(NULLIF(v_field->>'type', ''), 'numeric');
|
|
BEGIN
|
|
WHILE v_expr ~ '\{[^}]+\}' LOOP
|
|
v_ref := substring(v_expr FROM '\{([^}]+)\}');
|
|
v_expr := replace(v_expr, '{' || v_ref || '}',
|
|
format('(r->>%L)::numeric', v_ref));
|
|
END LOOP;
|
|
v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name');
|
|
END;
|
|
ELSE
|
|
CASE v_field->>'type'
|
|
WHEN 'date' THEN
|
|
v_cols := v_cols || format('(r->>%L)::date AS %I',
|
|
v_field->>'name', v_field->>'name');
|
|
WHEN 'numeric' THEN
|
|
v_cols := v_cols || format('(r->>%L)::numeric AS %I',
|
|
v_field->>'name', v_field->>'name');
|
|
ELSE
|
|
v_cols := v_cols || format('r->>%L AS %I',
|
|
v_field->>'name', v_field->>'name');
|
|
END CASE;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
CREATE SCHEMA IF NOT EXISTS dfv;
|
|
|
|
v_view := 'dfv.' || quote_ident(p_source_name);
|
|
|
|
EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view);
|
|
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS SELECT id, %s FROM (SELECT id, data || COALESCE(transformed, ''{}''::jsonb) || COALESCE(overrides, ''{}''::jsonb) AS r FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL) rec',
|
|
v_view, v_cols, p_source_name
|
|
);
|
|
|
|
EXECUTE v_sql;
|
|
|
|
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
COMMENT ON FUNCTION generate_source_view IS 'Generate a typed flat view in dfv schema from source config.fields';
|
|
|
|
------------------------------------------------------
|
|
-- Function: set_record_overrides
|
|
-- Save override values for a single record
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS set_record_overrides(INTEGER, JSONB);
|
|
CREATE OR REPLACE FUNCTION set_record_overrides(p_id INTEGER, p_overrides JSONB)
|
|
RETURNS JSON AS $$
|
|
WITH updated AS (
|
|
UPDATE dataflow.records
|
|
SET overrides = CASE WHEN p_overrides = '{}'::jsonb THEN NULL ELSE p_overrides END
|
|
WHERE id = p_id
|
|
RETURNING *
|
|
)
|
|
SELECT row_to_json(updated) FROM updated;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: clear_record_overrides
|
|
-- Remove all overrides for a single record
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS clear_record_overrides(INTEGER);
|
|
CREATE OR REPLACE FUNCTION clear_record_overrides(p_id INTEGER)
|
|
RETURNS JSON AS $$
|
|
WITH updated AS (
|
|
UPDATE dataflow.records
|
|
SET overrides = NULL
|
|
WHERE id = p_id
|
|
RETURNING *
|
|
)
|
|
SELECT row_to_json(updated) FROM updated;
|
|
$$ LANGUAGE sql;
|
|
|
|
------------------------------------------------------
|
|
-- Function: bulk_set_record_overrides
|
|
-- Apply override values to multiple records
|
|
------------------------------------------------------
|
|
DROP FUNCTION IF EXISTS bulk_set_record_overrides(TEXT, INTEGER[], JSONB);
|
|
CREATE OR REPLACE FUNCTION bulk_set_record_overrides(p_source_name TEXT, p_ids INTEGER[], p_overrides JSONB)
|
|
RETURNS JSON AS $$
|
|
WITH updated AS (
|
|
UPDATE dataflow.records
|
|
SET overrides = COALESCE(overrides, '{}'::jsonb) || p_overrides
|
|
WHERE id = ANY(p_ids)
|
|
AND source_name = p_source_name
|
|
RETURNING id
|
|
)
|
|
SELECT json_build_object('updated', count(*)) FROM updated;
|
|
$$ LANGUAGE sql;
|
|
|
|
CREATE OR REPLACE FUNCTION dataflow.calibrate_balance(p_stack_name text, p_source_name text, p_as_of_date date, p_known_balance numeric)
|
|
RETURNS json
|
|
LANGUAGE plpgsql
|
|
STABLE
|
|
AS $function$
|
|
DECLARE
|
|
v_src dataflow.stack_sources%ROWTYPE;
|
|
v_running NUMERIC;
|
|
v_sql TEXT;
|
|
BEGIN
|
|
SELECT * INTO v_src
|
|
FROM dataflow.stack_sources
|
|
WHERE stack_name = p_stack_name AND source_name = p_source_name;
|
|
|
|
IF NOT FOUND THEN
|
|
RETURN json_build_object('success', false, 'error', 'Source not in stack');
|
|
END IF;
|
|
IF v_src.amount_field IS NULL OR v_src.date_field IS NULL THEN
|
|
RETURN json_build_object('success', false, 'error', 'Set amount and date fields on this source first');
|
|
END IF;
|
|
|
|
BEGIN
|
|
IF p_as_of_date IS NULL THEN
|
|
v_sql := format(
|
|
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I',
|
|
v_src.amount_field, v_src.amount_sign, p_source_name
|
|
);
|
|
ELSE
|
|
v_sql := format(
|
|
'SELECT COALESCE(SUM(%I * %s), 0) FROM dfv.%I WHERE %I <= %L::date',
|
|
v_src.amount_field, v_src.amount_sign, p_source_name, v_src.date_field, p_as_of_date
|
|
);
|
|
END IF;
|
|
EXECUTE v_sql INTO v_running;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
RETURN json_build_object('success', false, 'error', 'Source view not found — generate the source view first');
|
|
END;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'source', p_source_name,
|
|
'as_of_date', p_as_of_date,
|
|
'known_balance', p_known_balance,
|
|
'computed_sum', v_running,
|
|
'suggested_offset', p_known_balance - v_running
|
|
);
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.create_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb)
|
|
RETURNS dataflow.mappings
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
|
|
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.create_rule(p_source_name text, p_name text, p_field text, p_pattern text, p_output_field text, p_function_type text DEFAULT 'extract'::text, p_flags text DEFAULT ''::text, p_replace_value text DEFAULT ''::text, p_enabled boolean DEFAULT true, p_retain boolean DEFAULT false, p_sequence integer DEFAULT 0)
|
|
RETURNS dataflow.rules
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.rules
|
|
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, enabled, retain, sequence)
|
|
VALUES
|
|
(p_source_name, p_name, p_field, p_pattern, p_output_field, p_function_type, p_flags, p_replace_value, p_enabled, p_retain, p_sequence)
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb, p_global_picklist boolean DEFAULT true)
|
|
RETURNS dataflow.sources
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.sources (name, constraint_fields, config, global_picklist)
|
|
VALUES (p_name, p_constraint_fields, p_config, p_global_picklist)
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.create_source(p_name text, p_constraint_fields text[], p_config jsonb DEFAULT '{}'::jsonb)
|
|
RETURNS dataflow.sources
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.sources (name, constraint_fields, config)
|
|
VALUES (p_name, p_constraint_fields, p_config)
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.create_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT '[]'::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT 0)
|
|
RETURNS dataflow.stacks
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.stacks (name, label, fields, amount_field, date_field, balance_offset)
|
|
VALUES (p_name, p_label, p_fields, p_amount_field, p_date_field, p_balance_offset)
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_mapping(p_id integer)
|
|
RETURNS TABLE(id integer)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_pivot_layout(p_id integer)
|
|
RETURNS TABLE(id integer)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.pivot_layouts WHERE id = p_id RETURNING id;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_record(p_id bigint)
|
|
RETURNS TABLE(id bigint)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.records WHERE id = p_id RETURNING id;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_rule(p_id integer)
|
|
RETURNS TABLE(id integer, name text)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.rules WHERE id = p_id RETURNING id, name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_source(p_name text)
|
|
RETURNS text
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.sources WHERE name = p_name RETURNING name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_source_records(p_source_name text)
|
|
RETURNS TABLE(deleted_count bigint)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
WITH deleted AS (
|
|
DELETE FROM dataflow.records WHERE source_name = p_source_name RETURNING id
|
|
)
|
|
SELECT count(*) AS deleted_count FROM deleted;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.delete_stack(p_name text)
|
|
RETURNS TABLE(name text)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.stacks WHERE name = p_name RETURNING name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.generate_stack_view(p_stack_name text, p_dry_run boolean DEFAULT false)
|
|
RETURNS json
|
|
LANGUAGE plpgsql
|
|
AS $function$
|
|
DECLARE
|
|
v_stack dataflow.stacks%ROWTYPE;
|
|
v_src dataflow.stack_sources%ROWTYPE;
|
|
v_field JSONB;
|
|
v_ctes TEXT[] := '{}';
|
|
v_cte_names TEXT[] := '{}';
|
|
v_select TEXT;
|
|
v_col TEXT;
|
|
v_src_field TEXT;
|
|
v_amt_src TEXT;
|
|
v_date_src TEXT;
|
|
v_view TEXT;
|
|
v_sql TEXT;
|
|
v_has_bal BOOLEAN;
|
|
v_canon_cols TEXT;
|
|
v_src_bal_cols TEXT;
|
|
v_total_offset NUMERIC := 0;
|
|
v_cascade_stale TEXT[];
|
|
BEGIN
|
|
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
|
|
IF NOT FOUND THEN
|
|
RETURN json_build_object('success', false, 'error', 'Stack not found');
|
|
END IF;
|
|
|
|
v_has_bal := v_stack.amount_field IS NOT NULL AND v_stack.date_field IS NOT NULL;
|
|
|
|
-- Build one CTE per source querying dfv.{source} directly
|
|
FOR v_src IN
|
|
SELECT * FROM dataflow.stack_sources WHERE stack_name = p_stack_name ORDER BY seq, id
|
|
LOOP
|
|
v_select := format('SELECT %L AS _source, id AS _id', v_src.source_name);
|
|
|
|
FOR v_field IN SELECT * FROM jsonb_array_elements(v_stack.fields)
|
|
LOOP
|
|
v_col := v_field->>'name';
|
|
|
|
IF v_has_bal AND v_col = v_stack.amount_field THEN
|
|
-- Use per-source amount_field with sign applied
|
|
IF v_src.amount_field IS NULL THEN
|
|
v_select := v_select || format(', NULL::%s AS %I', v_field->>'type', v_col);
|
|
ELSE
|
|
v_select := v_select || format(', %I * %s AS %I', v_src.amount_field, v_src.amount_sign, v_col);
|
|
END IF;
|
|
ELSIF v_has_bal AND v_col = v_stack.date_field THEN
|
|
-- Use per-source date_field
|
|
IF v_src.date_field IS NULL THEN
|
|
v_select := v_select || format(', NULL::date AS %I', v_col);
|
|
ELSE
|
|
v_select := v_select || format(', %I AS %I', v_src.date_field, v_col);
|
|
END IF;
|
|
ELSE
|
|
-- Other canonical fields: use field_map or same name, NULL if column doesn't exist
|
|
v_src_field := COALESCE(v_src.field_map->>v_col, v_col);
|
|
IF EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = 'dfv'
|
|
AND table_name = v_src.source_name
|
|
AND column_name = v_src_field
|
|
) THEN
|
|
v_select := v_select || format(', %I AS %I', v_src_field, v_col);
|
|
ELSE
|
|
v_select := v_select || format(', NULL::text AS %I', v_col);
|
|
END IF;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
v_select := v_select || format(' FROM dfv.%I', v_src.source_name);
|
|
|
|
v_ctes := v_ctes || format('%I AS (%s)', v_src.source_name, v_select);
|
|
v_cte_names := v_cte_names || quote_ident(v_src.source_name);
|
|
|
|
-- Accumulate carried-forward source balance column and total offset
|
|
IF v_has_bal THEN
|
|
IF v_src_bal_cols IS NOT NULL THEN v_src_bal_cols := v_src_bal_cols || ', '; END IF;
|
|
v_src_bal_cols := COALESCE(v_src_bal_cols, '') || format(
|
|
'SUM(CASE WHEN _source = %L THEN %I END) OVER (ORDER BY %I ASC, _id ASC) + %s AS %I',
|
|
v_src.source_name, v_stack.amount_field, v_stack.date_field,
|
|
v_src.balance_offset, v_src.source_name || '_balance'
|
|
);
|
|
v_total_offset := v_total_offset + v_src.balance_offset;
|
|
END IF;
|
|
END LOOP;
|
|
|
|
IF array_length(v_ctes, 1) IS NULL THEN
|
|
RETURN json_build_object('success', false, 'error', 'Stack has no sources');
|
|
END IF;
|
|
|
|
v_view := 'dfv.' || quote_ident(p_stack_name);
|
|
|
|
v_canon_cols := (
|
|
SELECT string_agg(quote_ident(f->>'name'), ', ')
|
|
FROM jsonb_array_elements(v_stack.fields) f
|
|
);
|
|
|
|
IF v_has_bal THEN
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS '
|
|
'WITH %s, _stacked AS (SELECT * FROM %s) '
|
|
'SELECT _source, _id, %s, '
|
|
'%s, '
|
|
'SUM(%I) OVER (ORDER BY %I ASC, _id ASC) + %s AS net_balance '
|
|
'FROM _stacked ORDER BY %I DESC, _id DESC',
|
|
v_view,
|
|
array_to_string(v_ctes, ', '),
|
|
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
|
|
v_canon_cols,
|
|
v_src_bal_cols,
|
|
v_stack.amount_field,
|
|
v_stack.date_field,
|
|
v_total_offset,
|
|
v_stack.date_field
|
|
);
|
|
ELSE
|
|
v_sql := format(
|
|
'CREATE VIEW %s AS '
|
|
'WITH %s, _stacked AS (SELECT * FROM %s) '
|
|
'SELECT _source, _id, %s FROM _stacked',
|
|
v_view,
|
|
array_to_string(v_ctes, ', '),
|
|
array_to_string(v_cte_names, ' UNION ALL SELECT * FROM '),
|
|
v_canon_cols
|
|
);
|
|
END IF;
|
|
|
|
IF NOT p_dry_run THEN
|
|
CREATE SCHEMA IF NOT EXISTS dfv;
|
|
EXECUTE format('DROP VIEW IF EXISTS %s CASCADE', v_view);
|
|
EXECUTE v_sql;
|
|
|
|
-- Detect stacks whose views were dropped by CASCADE and mark them stale
|
|
SELECT array_agg(s.name) INTO v_cascade_stale
|
|
FROM dataflow.stacks s
|
|
WHERE s.name != p_stack_name
|
|
AND s.view_generated_at IS NOT NULL
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM pg_views v
|
|
WHERE v.schemaname = 'dfv' AND v.viewname = s.name
|
|
);
|
|
|
|
UPDATE dataflow.stacks SET view_generated_at = NULL
|
|
WHERE name = ANY(v_cascade_stale);
|
|
END IF;
|
|
|
|
RETURN json_build_object(
|
|
'success', true,
|
|
'view', v_view,
|
|
'sql', v_sql,
|
|
'cascade_stale', COALESCE(to_json(v_cascade_stale), '[]'::json)
|
|
);
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_global_output_values()
|
|
RETURNS TABLE(col text, val text)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT DISTINCT e.key AS col, e.value AS val
|
|
FROM dataflow.mappings m
|
|
JOIN dataflow.sources s ON s.name = m.source_name
|
|
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
|
|
WHERE s.global_picklist = true
|
|
AND e.value IS NOT NULL
|
|
AND e.value <> ''
|
|
ORDER BY e.key, e.value;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_mapping(p_id integer)
|
|
RETURNS dataflow.mappings
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.mappings WHERE id = p_id;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_mapping_counts(p_source_name text, p_rule_name text DEFAULT NULL::text)
|
|
RETURNS TABLE(rule_name text, input_value jsonb, record_count bigint)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT
|
|
m.rule_name,
|
|
m.input_value,
|
|
COUNT(rec.id) AS record_count
|
|
FROM dataflow.mappings m
|
|
JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name
|
|
LEFT JOIN dataflow.records rec ON
|
|
rec.source_name = m.source_name
|
|
AND rec.transformed ? r.output_field
|
|
AND rec.transformed -> r.output_field = m.input_value
|
|
WHERE m.source_name = p_source_name
|
|
AND (p_rule_name IS NULL OR m.rule_name = p_rule_name)
|
|
GROUP BY m.rule_name, m.input_value;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_mappings_by_output_field(p_col text, p_val text)
|
|
RETURNS TABLE(id integer, source_name text, rule_name text, input_value jsonb, output jsonb)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT m.id, m.source_name, m.rule_name, m.input_value, m.output
|
|
FROM dataflow.mappings m
|
|
WHERE m.output->>(p_col) = p_val
|
|
ORDER BY m.source_name, m.rule_name, m.input_value::text;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_record(p_id bigint)
|
|
RETURNS dataflow.records
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.records WHERE id = p_id;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_rule(p_id integer)
|
|
RETURNS dataflow.rules
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.rules WHERE id = p_id;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_source(p_name text)
|
|
RETURNS dataflow.sources
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.sources WHERE name = p_name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_source_fields(p_source_name text)
|
|
RETURNS TABLE(key text, origins text[])
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins
|
|
FROM (
|
|
SELECT f->>'name' AS key, 'schema' AS origin
|
|
FROM dataflow.sources, jsonb_array_elements(config->'fields') f
|
|
WHERE name = p_source_name AND config ? 'fields'
|
|
UNION ALL
|
|
SELECT jsonb_object_keys(data) AS key, 'raw' AS origin
|
|
FROM dataflow.records WHERE source_name = p_source_name
|
|
UNION ALL
|
|
SELECT output_field AS key, 'rule: ' || name AS origin
|
|
FROM dataflow.rules WHERE source_name = p_source_name
|
|
UNION ALL
|
|
SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin
|
|
FROM dataflow.mappings WHERE source_name = p_source_name
|
|
) keys
|
|
GROUP BY key
|
|
ORDER BY key;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_source_stats(p_source_name text)
|
|
RETURNS TABLE(total_records bigint, transformed_records bigint, pending_records bigint)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT
|
|
COUNT(*) AS total_records,
|
|
COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records,
|
|
COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records
|
|
FROM dataflow.records
|
|
WHERE source_name = p_source_name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_stack(p_name text)
|
|
RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, created_at timestamp with time zone, sources jsonb)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT
|
|
s.name, s.label, s.fields,
|
|
s.amount_field, s.date_field, s.balance_offset,
|
|
s.created_at,
|
|
COALESCE(jsonb_agg(
|
|
jsonb_build_object(
|
|
'id', ss.id,
|
|
'source_name', ss.source_name,
|
|
'field_map', ss.field_map,
|
|
'amount_field', ss.amount_field,
|
|
'amount_sign', ss.amount_sign,
|
|
'date_field', ss.date_field,
|
|
'balance_offset', ss.balance_offset,
|
|
'seq', ss.seq
|
|
) ORDER BY ss.seq, ss.id
|
|
) FILTER (WHERE ss.id IS NOT NULL), '[]')
|
|
FROM dataflow.stacks s
|
|
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
|
|
WHERE s.name = p_name
|
|
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_stack_balance(p_stack_name text)
|
|
RETURNS json
|
|
LANGUAGE plpgsql
|
|
STABLE
|
|
AS $function$
|
|
DECLARE
|
|
v_stack dataflow.stacks%ROWTYPE;
|
|
v_balance NUMERIC;
|
|
v_view TEXT;
|
|
v_sql TEXT;
|
|
BEGIN
|
|
SELECT * INTO v_stack FROM dataflow.stacks WHERE name = p_stack_name;
|
|
IF NOT FOUND THEN
|
|
RETURN json_build_object('success', false, 'error', 'Stack not found');
|
|
END IF;
|
|
IF v_stack.amount_field IS NULL OR v_stack.date_field IS NULL THEN
|
|
RETURN json_build_object('success', false, 'error', 'amount_field and date_field must be set');
|
|
END IF;
|
|
|
|
v_view := 'dfv.' || quote_ident(p_stack_name);
|
|
|
|
BEGIN
|
|
v_sql := format(
|
|
'SELECT net_balance FROM %s ORDER BY %I DESC, _id DESC LIMIT 1',
|
|
v_view, v_stack.date_field
|
|
);
|
|
EXECUTE v_sql INTO v_balance;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
RETURN json_build_object('success', false, 'error', 'View not generated yet — click Generate first');
|
|
END;
|
|
|
|
RETURN json_build_object('success', true, 'balance', v_balance);
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_status()
|
|
RETURNS json
|
|
LANGUAGE plpgsql
|
|
STABLE
|
|
AS $function$
|
|
DECLARE
|
|
v_sources JSON;
|
|
v_stacks JSON;
|
|
BEGIN
|
|
SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json)
|
|
INTO v_sources
|
|
FROM dataflow.sources
|
|
WHERE view_generated_at IS NULL;
|
|
|
|
SELECT COALESCE(json_agg(json_build_object('name', name, 'view_generated_at', view_generated_at) ORDER BY name), '[]'::json)
|
|
INTO v_stacks
|
|
FROM dataflow.stacks
|
|
WHERE view_generated_at IS NULL;
|
|
|
|
RETURN json_build_object('stale_sources', v_sources, 'stale_stacks', v_stacks);
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.get_view_data(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_sort_col text DEFAULT NULL::text, p_sort_dir text DEFAULT 'asc'::text, p_filters jsonb DEFAULT NULL::jsonb)
|
|
RETURNS json
|
|
LANGUAGE plpgsql
|
|
STABLE
|
|
AS $function$
|
|
DECLARE
|
|
v_exists BOOLEAN;
|
|
v_where TEXT := '';
|
|
v_order TEXT := '';
|
|
v_rows JSON;
|
|
v_filter JSONB;
|
|
v_col TEXT;
|
|
v_pattern TEXT;
|
|
BEGIN
|
|
SELECT EXISTS (
|
|
SELECT 1 FROM information_schema.views
|
|
WHERE table_schema = 'dfv' AND table_name = p_source_name
|
|
) INTO v_exists;
|
|
|
|
IF NOT v_exists THEN
|
|
RETURN json_build_object('exists', FALSE, 'rows', '[]'::json);
|
|
END IF;
|
|
|
|
-- Build WHERE from filters (validate each column exists in the view)
|
|
IF p_filters IS NOT NULL THEN
|
|
FOR v_filter IN SELECT value FROM jsonb_array_elements(p_filters) LOOP
|
|
v_col := v_filter->>'col';
|
|
v_pattern := v_filter->>'pattern';
|
|
IF v_pattern IS NOT NULL AND v_pattern <> '' AND EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = 'dfv'
|
|
AND table_name = p_source_name
|
|
AND column_name = v_col
|
|
) THEN
|
|
v_where := v_where ||
|
|
CASE WHEN v_where = '' THEN ' WHERE ' ELSE ' AND ' END ||
|
|
quote_ident(v_col) || '::text ~* ' || quote_literal(v_pattern);
|
|
END IF;
|
|
END LOOP;
|
|
END IF;
|
|
|
|
IF p_sort_col IS NOT NULL AND EXISTS (
|
|
SELECT 1 FROM information_schema.columns
|
|
WHERE table_schema = 'dfv'
|
|
AND table_name = p_source_name
|
|
AND column_name = p_sort_col
|
|
) THEN
|
|
v_order := ' ORDER BY ' || quote_ident(p_sort_col)
|
|
|| CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END
|
|
|| ' NULLS LAST';
|
|
END IF;
|
|
|
|
EXECUTE format(
|
|
'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s%s LIMIT %s OFFSET %s) t',
|
|
p_source_name, v_where, v_order, p_limit, p_offset
|
|
) INTO v_rows;
|
|
|
|
RETURN json_build_object('exists', TRUE, 'rows', v_rows);
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.list_mappings(p_source_name text, p_rule_name text DEFAULT NULL::text)
|
|
RETURNS SETOF dataflow.mappings
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.mappings
|
|
WHERE source_name = p_source_name
|
|
AND (p_rule_name IS NULL OR rule_name = p_rule_name)
|
|
ORDER BY rule_name, input_value::text;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.list_pivot_layouts(p_source_name text)
|
|
RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
SELECT id, source_name, layout_name, config, created_at
|
|
FROM dataflow.pivot_layouts
|
|
WHERE source_name = p_source_name
|
|
ORDER BY layout_name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.list_records(p_source_name text, p_limit integer DEFAULT 100, p_offset integer DEFAULT 0, p_transformed_only boolean DEFAULT false)
|
|
RETURNS SETOF dataflow.records
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.records
|
|
WHERE source_name = p_source_name
|
|
AND (NOT p_transformed_only OR transformed IS NOT NULL)
|
|
ORDER BY id DESC
|
|
LIMIT p_limit OFFSET p_offset;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.list_rules(p_source_name text)
|
|
RETURNS SETOF dataflow.rules
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.rules
|
|
WHERE source_name = p_source_name
|
|
ORDER BY sequence, name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.list_sources()
|
|
RETURNS SETOF dataflow.sources
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.sources ORDER BY name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.list_stacks()
|
|
RETURNS TABLE(name text, label text, fields jsonb, amount_field text, date_field text, balance_offset numeric, source_count bigint, created_at timestamp with time zone)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT
|
|
s.name, s.label, s.fields,
|
|
s.amount_field, s.date_field, s.balance_offset,
|
|
count(ss.id) AS source_count,
|
|
s.created_at
|
|
FROM dataflow.stacks s
|
|
LEFT JOIN dataflow.stack_sources ss ON ss.stack_name = s.name
|
|
GROUP BY s.name, s.label, s.fields, s.amount_field, s.date_field, s.balance_offset, s.created_at
|
|
ORDER BY s.name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.preview_rule(p_source text, p_field text, p_pattern text, p_flags text DEFAULT ''::text, p_function_type text DEFAULT 'extract'::text, p_replace_value text DEFAULT ''::text, p_limit integer DEFAULT 20)
|
|
RETURNS TABLE(id integer, raw_value text, extracted_value jsonb)
|
|
LANGUAGE plpgsql
|
|
STABLE
|
|
AS $function$
|
|
-- Field is resolved from data first, then transformed (supports chained rules whose
|
|
-- input field was produced by an earlier-sequence rule rather than the raw import).
|
|
BEGIN
|
|
IF p_function_type = 'replace' THEN
|
|
RETURN QUERY
|
|
SELECT
|
|
r.id,
|
|
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
|
|
to_jsonb(regexp_replace(
|
|
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
|
|
p_pattern, p_replace_value, p_flags
|
|
))
|
|
FROM dataflow.records r
|
|
WHERE source_name = p_source
|
|
AND (data ? p_field OR transformed ? p_field)
|
|
ORDER BY r.id DESC LIMIT p_limit;
|
|
ELSE
|
|
RETURN QUERY
|
|
SELECT
|
|
r.id,
|
|
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
|
|
CASE
|
|
WHEN agg.match_count = 0 THEN NULL
|
|
WHEN agg.match_count = 1 THEN agg.matches -> 0
|
|
ELSE agg.matches
|
|
END
|
|
FROM dataflow.records r
|
|
CROSS JOIN LATERAL (
|
|
SELECT
|
|
jsonb_agg(
|
|
CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) ELSE to_jsonb(mt) END
|
|
ORDER BY rn
|
|
) AS matches,
|
|
count(*)::int AS match_count
|
|
FROM regexp_matches(
|
|
COALESCE(r.data ->> p_field, r.transformed ->> p_field),
|
|
p_pattern, p_flags
|
|
)
|
|
WITH ORDINALITY AS m(mt, rn)
|
|
) agg
|
|
WHERE r.source_name = p_source
|
|
AND (r.data ? p_field OR r.transformed ? p_field)
|
|
ORDER BY r.id DESC LIMIT p_limit;
|
|
END IF;
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.remap_output_field(p_col text, p_from_val text, p_to_val text)
|
|
RETURNS integer
|
|
LANGUAGE plpgsql
|
|
AS $function$
|
|
DECLARE
|
|
updated_count INTEGER;
|
|
BEGIN
|
|
UPDATE dataflow.mappings
|
|
SET output = jsonb_set(output, ARRAY[p_col], to_jsonb(p_to_val))
|
|
WHERE output->>(p_col) = p_from_val;
|
|
GET DIAGNOSTICS updated_count = ROW_COUNT;
|
|
RETURN updated_count;
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.remove_stack_source(p_stack_name text, p_source_name text)
|
|
RETURNS TABLE(source_name text)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
DELETE FROM dataflow.stack_sources
|
|
WHERE stack_name = p_stack_name AND source_name = p_source_name
|
|
RETURNING source_name;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.reorder_stack_sources(p_stack_name text, p_source_names text[])
|
|
RETURNS void
|
|
LANGUAGE plpgsql
|
|
AS $function$
|
|
DECLARE
|
|
i INTEGER;
|
|
BEGIN
|
|
FOR i IN 1..array_length(p_source_names, 1) LOOP
|
|
UPDATE dataflow.stack_sources
|
|
SET seq = i
|
|
WHERE stack_name = p_stack_name AND source_name = p_source_names[i];
|
|
END LOOP;
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.save_pivot_layout(p_source_name text, p_layout_name text, p_config jsonb)
|
|
RETURNS TABLE(id integer, source_name text, layout_name text, config jsonb, created_at timestamp with time zone)
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.pivot_layouts (source_name, layout_name, config)
|
|
VALUES (p_source_name, p_layout_name, p_config)
|
|
ON CONFLICT (source_name, layout_name) DO UPDATE
|
|
SET config = EXCLUDED.config
|
|
RETURNING id, source_name, layout_name, config, created_at;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.search_mapping_outputs(p_search text)
|
|
RETURNS TABLE(col text, val text, mapping_count bigint)
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT e.key AS col, e.value AS val, COUNT(*) AS mapping_count
|
|
FROM dataflow.mappings m
|
|
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
|
|
WHERE e.value ILIKE '%' || p_search || '%'
|
|
AND e.value IS NOT NULL
|
|
AND e.value <> ''
|
|
GROUP BY e.key, e.value
|
|
ORDER BY e.key, e.value;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.search_records(p_source_name text, p_query jsonb, p_limit integer DEFAULT 100)
|
|
RETURNS SETOF dataflow.records
|
|
LANGUAGE sql
|
|
STABLE
|
|
AS $function$
|
|
SELECT * FROM dataflow.records
|
|
WHERE source_name = p_source_name
|
|
AND (data @> p_query OR transformed @> p_query)
|
|
ORDER BY id DESC
|
|
LIMIT p_limit;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.source_config_changed()
|
|
RETURNS trigger
|
|
LANGUAGE plpgsql
|
|
AS $function$
|
|
BEGIN
|
|
IF NEW.config IS DISTINCT FROM OLD.config THEN
|
|
NEW.view_generated_at := NULL;
|
|
END IF;
|
|
RETURN NEW;
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.stack_sources_changed()
|
|
RETURNS trigger
|
|
LANGUAGE plpgsql
|
|
AS $function$
|
|
BEGIN
|
|
IF TG_OP = 'UPDATE' THEN
|
|
IF NEW.field_map IS NOT DISTINCT FROM OLD.field_map AND
|
|
NEW.amount_sign IS NOT DISTINCT FROM OLD.amount_sign AND
|
|
NEW.balance_offset IS NOT DISTINCT FROM OLD.balance_offset AND
|
|
NEW.amount_field IS NOT DISTINCT FROM OLD.amount_field AND
|
|
NEW.date_field IS NOT DISTINCT FROM OLD.date_field AND
|
|
NEW.seq IS NOT DISTINCT FROM OLD.seq THEN
|
|
RETURN NULL;
|
|
END IF;
|
|
END IF;
|
|
UPDATE dataflow.stacks SET view_generated_at = NULL
|
|
WHERE name = COALESCE(NEW.stack_name, OLD.stack_name);
|
|
RETURN NULL;
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.test_rule(p_rule_id integer, p_limit integer DEFAULT 20)
|
|
RETURNS TABLE(rule jsonb, results jsonb)
|
|
LANGUAGE plpgsql
|
|
STABLE
|
|
AS $function$
|
|
DECLARE
|
|
v_rule dataflow.rules%ROWTYPE;
|
|
v_results JSONB;
|
|
BEGIN
|
|
SELECT * INTO v_rule FROM dataflow.rules WHERE id = p_rule_id;
|
|
IF NOT FOUND THEN RETURN; END IF;
|
|
|
|
SELECT jsonb_agg(row_to_json(t)) INTO v_results FROM (
|
|
SELECT
|
|
r.id,
|
|
r.data ->> v_rule.field AS raw_value,
|
|
CASE
|
|
WHEN agg.match_count = 0 THEN NULL
|
|
WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1
|
|
THEN to_jsonb(agg.matches[1][1])
|
|
WHEN agg.match_count = 1
|
|
THEN to_jsonb(agg.matches[1])
|
|
WHEN array_length(agg.matches[1], 1) = 1
|
|
THEN (SELECT jsonb_agg(m[1] ORDER BY idx) FROM unnest(agg.matches) WITH ORDINALITY u(m, idx))
|
|
ELSE to_jsonb(agg.matches)
|
|
END AS extracted_value
|
|
FROM dataflow.records r
|
|
CROSS JOIN LATERAL (
|
|
SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count
|
|
FROM regexp_matches(r.data ->> v_rule.field, v_rule.pattern, COALESCE(v_rule.flags, ''))
|
|
WITH ORDINALITY AS m(mt, rn)
|
|
) agg
|
|
WHERE r.source_name = v_rule.source_name AND r.data ? v_rule.field
|
|
ORDER BY r.id DESC LIMIT p_limit
|
|
) t;
|
|
|
|
RETURN QUERY SELECT
|
|
jsonb_build_object(
|
|
'id', v_rule.id, 'name', v_rule.name,
|
|
'field', v_rule.field, 'pattern', v_rule.pattern,
|
|
'output_field', v_rule.output_field
|
|
),
|
|
COALESCE(v_results, '[]'::jsonb);
|
|
END;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.update_mapping(p_id integer, p_input_value jsonb DEFAULT NULL::jsonb, p_output jsonb DEFAULT NULL::jsonb)
|
|
RETURNS dataflow.mappings
|
|
LANGUAGE sql
|
|
AS $function$
|
|
UPDATE dataflow.mappings SET
|
|
input_value = COALESCE(p_input_value, input_value),
|
|
output = COALESCE(p_output, output)
|
|
WHERE id = p_id
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.update_rule(p_id integer, p_name text DEFAULT NULL::text, p_field text DEFAULT NULL::text, p_pattern text DEFAULT NULL::text, p_output_field text DEFAULT NULL::text, p_function_type text DEFAULT NULL::text, p_flags text DEFAULT NULL::text, p_replace_value text DEFAULT NULL::text, p_enabled boolean DEFAULT NULL::boolean, p_retain boolean DEFAULT NULL::boolean, p_sequence integer DEFAULT NULL::integer)
|
|
RETURNS dataflow.rules
|
|
LANGUAGE sql
|
|
AS $function$
|
|
UPDATE dataflow.rules SET
|
|
name = COALESCE(p_name, name),
|
|
field = COALESCE(p_field, field),
|
|
pattern = COALESCE(p_pattern, pattern),
|
|
output_field = COALESCE(p_output_field, output_field),
|
|
function_type = COALESCE(p_function_type, function_type),
|
|
flags = COALESCE(p_flags, flags),
|
|
replace_value = COALESCE(p_replace_value, replace_value),
|
|
enabled = COALESCE(p_enabled, enabled),
|
|
retain = COALESCE(p_retain, retain),
|
|
sequence = COALESCE(p_sequence, sequence)
|
|
WHERE id = p_id
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb, p_global_picklist boolean DEFAULT NULL::boolean)
|
|
RETURNS dataflow.sources
|
|
LANGUAGE sql
|
|
AS $function$
|
|
UPDATE dataflow.sources
|
|
SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields),
|
|
config = COALESCE(p_config, config),
|
|
global_picklist = COALESCE(p_global_picklist, global_picklist),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE name = p_name
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.update_source(p_name text, p_constraint_fields text[] DEFAULT NULL::text[], p_config jsonb DEFAULT NULL::jsonb)
|
|
RETURNS dataflow.sources
|
|
LANGUAGE sql
|
|
AS $function$
|
|
UPDATE dataflow.sources
|
|
SET constraint_fields = COALESCE(p_constraint_fields, constraint_fields),
|
|
config = COALESCE(p_config, config),
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE name = p_name
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.update_stack(p_name text, p_label text DEFAULT NULL::text, p_fields jsonb DEFAULT NULL::jsonb, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text, p_balance_offset numeric DEFAULT NULL::numeric)
|
|
RETURNS dataflow.stacks
|
|
LANGUAGE sql
|
|
AS $function$
|
|
UPDATE dataflow.stacks SET
|
|
label = COALESCE(p_label, label),
|
|
fields = COALESCE(p_fields, fields),
|
|
amount_field = COALESCE(p_amount_field, amount_field),
|
|
date_field = COALESCE(p_date_field, date_field),
|
|
balance_offset = COALESCE(p_balance_offset, balance_offset)
|
|
WHERE name = p_name
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.upsert_mapping(p_source_name text, p_rule_name text, p_input_value jsonb, p_output jsonb)
|
|
RETURNS dataflow.mappings
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
|
|
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
|
|
ON CONFLICT (source_name, rule_name, input_value)
|
|
DO UPDATE SET output = EXCLUDED.output
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
CREATE OR REPLACE FUNCTION dataflow.upsert_stack_source(p_stack_name text, p_source_name text, p_field_map jsonb DEFAULT '{}'::jsonb, p_amount_sign integer DEFAULT 1, p_balance_offset numeric DEFAULT 0, p_amount_field text DEFAULT NULL::text, p_date_field text DEFAULT NULL::text)
|
|
RETURNS dataflow.stack_sources
|
|
LANGUAGE sql
|
|
AS $function$
|
|
INSERT INTO dataflow.stack_sources (stack_name, source_name, field_map, amount_sign, balance_offset, amount_field, date_field, seq)
|
|
VALUES (
|
|
p_stack_name, p_source_name, p_field_map, p_amount_sign, p_balance_offset, p_amount_field, p_date_field,
|
|
(SELECT COALESCE(MAX(seq), 0) + 1 FROM dataflow.stack_sources WHERE stack_name = p_stack_name)
|
|
)
|
|
ON CONFLICT (stack_name, source_name) DO UPDATE SET
|
|
field_map = EXCLUDED.field_map,
|
|
amount_sign = EXCLUDED.amount_sign,
|
|
balance_offset = EXCLUDED.balance_offset,
|
|
amount_field = EXCLUDED.amount_field,
|
|
date_field = EXCLUDED.date_field
|
|
RETURNING *;
|
|
$function$
|
|
;
|
|
|
|
------------------------------------------------------
|
|
-- Summary
|
|
------------------------------------------------------
|
|
-- All dataflow functions are defined above.
|
|
-- Deploy with: psql -d dataflow -f database/functions.sql
|
|
------------------------------------------------------
|