dataflow/database/queries/sources.sql
Paul Trowbridge 291c665ed1 Consolidate all SQL into database/queries/, switch to literal SQL in routes
- Add database/queries/{sources,rules,mappings,records}.sql — one file per
  route, all business logic in PostgreSQL functions
- Replace parameterized queries in all four route files with lit()/jsonLit()
  literal interpolation for debuggability
- Add api/lib/sql.js with lit(), jsonLit(), arr() helpers
- Fix get_view_data to use json_agg (preserves column order) with subquery
  (guarantees sort order is respected before aggregation)
- Fix jsonLit() for JSONB params so plain strings become valid JSON
- Update manage.py option 3 to deploy database/queries/ instead of functions.sql
- Add SPEC.md covering architecture, philosophy, and manage.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 22:36:53 -04:00

323 lines
13 KiB
PL/PgSQL

--
-- Sources queries
-- All SQL for api/routes/sources.js
--
SET search_path TO dataflow, public;
-- ── CRUD ─────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION list_sources()
RETURNS SETOF dataflow.sources AS $$
SELECT * FROM dataflow.sources ORDER BY name;
$$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION get_source(p_name TEXT)
RETURNS dataflow.sources AS $$
SELECT * FROM dataflow.sources WHERE name = p_name;
$$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_dedup_fields TEXT[], p_config JSONB DEFAULT '{}')
RETURNS dataflow.sources AS $$
INSERT INTO dataflow.sources (name, dedup_fields, config)
VALUES (p_name, p_dedup_fields, p_config)
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_dedup_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL)
RETURNS dataflow.sources AS $$
UPDATE dataflow.sources
SET dedup_fields = COALESCE(p_dedup_fields, dedup_fields),
config = COALESCE(p_config, config),
updated_at = CURRENT_TIMESTAMP
WHERE name = p_name
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION delete_source(p_name TEXT)
RETURNS TEXT AS $$
DELETE FROM dataflow.sources WHERE name = p_name RETURNING name;
$$ LANGUAGE sql;
-- ── Import log ────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT)
RETURNS SETOF dataflow.import_log AS $$
SELECT * FROM dataflow.import_log
WHERE source_name = p_source_name
ORDER BY imported_at DESC;
$$ LANGUAGE sql STABLE;
-- ── Stats ─────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_source_stats(p_source_name TEXT)
RETURNS TABLE (total_records BIGINT, transformed_records BIGINT, pending_records BIGINT) AS $$
SELECT
COUNT(*) AS total_records,
COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records,
COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records
FROM dataflow.records
WHERE source_name = p_source_name;
$$ LANGUAGE sql STABLE;
-- ── Fields ────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_source_fields(p_source_name TEXT)
RETURNS TABLE (key TEXT, origins TEXT[]) AS $$
SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins
FROM (
SELECT f->>'name' AS key, 'schema' AS origin
FROM dataflow.sources, jsonb_array_elements(config->'fields') f
WHERE name = p_source_name AND config ? 'fields'
UNION ALL
SELECT jsonb_object_keys(data) AS key, 'raw' AS origin
FROM dataflow.records WHERE source_name = p_source_name
UNION ALL
SELECT output_field AS key, 'rule: ' || name AS origin
FROM dataflow.rules WHERE source_name = p_source_name
UNION ALL
SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin
FROM dataflow.mappings WHERE source_name = p_source_name
) keys
GROUP BY key
ORDER BY key;
$$ LANGUAGE sql STABLE;
-- ── View data (dynamic sort via EXECUTE) ──────────────────────────────────────
CREATE OR REPLACE FUNCTION get_view_data(
p_source_name TEXT,
p_limit INT DEFAULT 100,
p_offset INT DEFAULT 0,
p_sort_col TEXT DEFAULT NULL,
p_sort_dir TEXT DEFAULT 'asc'
)
RETURNS JSON AS $$
DECLARE
v_exists BOOLEAN;
v_order TEXT := '';
v_rows JSON;
BEGIN
SELECT EXISTS (
SELECT 1 FROM information_schema.views
WHERE table_schema = 'dfv' AND table_name = p_source_name
) INTO v_exists;
IF NOT v_exists THEN
RETURN json_build_object('exists', FALSE, 'rows', '[]'::json);
END IF;
IF p_sort_col IS NOT NULL AND EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'dfv'
AND table_name = p_source_name
AND column_name = p_sort_col
) THEN
v_order := ' ORDER BY ' || quote_ident(p_sort_col)
|| CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END
|| ' NULLS LAST';
END IF;
-- Subquery applies ORDER BY + LIMIT first, then json_agg collects in that order.
-- json_agg on the outer query preserves column order (json not jsonb).
EXECUTE format(
'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s LIMIT %s OFFSET %s) t',
p_source_name, v_order, p_limit, p_offset
) INTO v_rows;
RETURN json_build_object('exists', TRUE, 'rows', v_rows);
END;
$$ LANGUAGE plpgsql STABLE;
-- ── Import (deduplication) ────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB)
RETURNS JSON AS $$
DECLARE
v_dedup_fields TEXT[];
v_record JSONB;
v_dedup_key TEXT;
v_inserted INTEGER := 0;
v_duplicates INTEGER := 0;
v_log_id INTEGER;
BEGIN
SELECT dedup_fields INTO v_dedup_fields
FROM dataflow.sources WHERE name = p_source_name;
IF v_dedup_fields IS NULL THEN
RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name);
END IF;
FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP
v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields);
BEGIN
INSERT INTO dataflow.records (source_name, data, dedup_key)
VALUES (p_source_name, v_record, v_dedup_key);
v_inserted := v_inserted + 1;
EXCEPTION WHEN unique_violation THEN
v_duplicates := v_duplicates + 1;
END;
END LOOP;
INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate)
VALUES (p_source_name, v_inserted, v_duplicates)
RETURNING id INTO v_log_id;
RETURN json_build_object('success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id);
END;
$$ LANGUAGE plpgsql;
-- ── Transformations ───────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB)
RETURNS JSONB AS $$
SELECT COALESCE(a, '{}') || COALESCE(b, '{}')
$$ LANGUAGE sql IMMUTABLE;
DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB);
CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) (
sfunc = dataflow.jsonb_merge,
stype = JSONB,
initcond = '{}'
);
DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]);
CREATE OR REPLACE FUNCTION apply_transformations(
p_source_name TEXT,
p_record_ids INTEGER[] DEFAULT NULL,
p_overwrite BOOLEAN DEFAULT FALSE
) RETURNS JSON AS $$
WITH
qualifying AS (
SELECT id, data FROM dataflow.records
WHERE source_name = p_source_name
AND (p_overwrite OR transformed IS NULL)
AND (p_record_ids IS NULL OR id = ANY(p_record_ids))
),
rx AS (
SELECT
q.id,
r.name AS rule_name,
r.sequence,
r.output_field,
r.retain,
r.function_type,
COALESCE(mt.rn, rp.rn, 1) AS result_number,
CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val,
to_jsonb(rp.rp) AS replace_val
FROM dataflow.rules r
INNER JOIN qualifying q ON q.data ? r.field
LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags)
WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract'
LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags)
WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace'
WHERE r.source_name = p_source_name AND r.enabled = true
),
agg_matches AS (
SELECT
id, rule_name, sequence, output_field, retain, function_type,
CASE function_type
WHEN 'replace' THEN jsonb_agg(replace_val) -> 0
ELSE
CASE WHEN max(result_number) = 1
THEN jsonb_agg(match_val ORDER BY result_number) -> 0
ELSE jsonb_agg(match_val ORDER BY result_number)
END
END AS extracted
FROM rx
GROUP BY id, rule_name, sequence, output_field, retain, function_type
),
linked AS (
SELECT
a.id, a.sequence, a.output_field, a.retain, a.extracted, m.output AS mapped
FROM agg_matches a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted
WHERE a.extracted IS NOT NULL
),
rule_output AS (
SELECT
id, sequence,
CASE
WHEN mapped IS NOT NULL THEN
mapped || CASE WHEN retain THEN jsonb_build_object(output_field, extracted) ELSE '{}'::jsonb END
ELSE jsonb_build_object(output_field, extracted)
END AS output
FROM linked
),
record_additions AS (
SELECT id, dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions
FROM rule_output GROUP BY id
),
updated AS (
UPDATE dataflow.records rec
SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb),
transformed_at = CURRENT_TIMESTAMP
FROM qualifying q
LEFT JOIN record_additions ra ON ra.id = q.id
WHERE rec.id = q.id
RETURNING rec.id
)
SELECT json_build_object('success', true, 'transformed', count(*)) FROM updated
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT)
RETURNS JSON AS $$
SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE)
$$ LANGUAGE sql;
-- ── View generation ───────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT)
RETURNS JSON AS $$
DECLARE
v_config JSONB;
v_field JSONB;
v_cols TEXT := '';
v_sql TEXT;
v_view TEXT;
BEGIN
SELECT config INTO v_config FROM dataflow.sources WHERE name = p_source_name;
IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN
RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source');
END IF;
FOR v_field IN SELECT * FROM jsonb_array_elements(v_config->'fields') LOOP
IF v_cols != '' THEN v_cols := v_cols || ', '; END IF;
IF v_field->>'expression' IS NOT NULL THEN
DECLARE
v_expr TEXT := v_field->>'expression';
v_ref TEXT;
BEGIN
WHILE v_expr ~ '\{[^}]+\}' LOOP
v_ref := substring(v_expr FROM '\{([^}]+)\}');
v_expr := replace(v_expr, '{' || v_ref || '}', format('(transformed->>%L)::numeric', v_ref));
END LOOP;
v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name');
END;
ELSE
CASE v_field->>'type'
WHEN 'date' THEN v_cols := v_cols || format('(transformed->>%L)::date AS %I', v_field->>'name', v_field->>'name');
WHEN 'numeric' THEN v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', v_field->>'name', v_field->>'name');
ELSE v_cols := v_cols || format('transformed->>%L AS %I', v_field->>'name', v_field->>'name');
END CASE;
END IF;
END LOOP;
CREATE SCHEMA IF NOT EXISTS dfv;
v_view := 'dfv.' || quote_ident(p_source_name);
EXECUTE format('DROP VIEW IF EXISTS %s', v_view);
v_sql := format(
'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL',
v_view, v_cols, p_source_name
);
EXECUTE v_sql;
RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql);
END;
$$ LANGUAGE plpgsql;