-- -- Sources queries -- All SQL for api/routes/sources.js -- SET search_path TO dataflow, public; -- ── CRUD ───────────────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION list_sources() RETURNS SETOF dataflow.sources AS $$ SELECT * FROM dataflow.sources ORDER BY name; $$ LANGUAGE sql STABLE; CREATE OR REPLACE FUNCTION get_source(p_name TEXT) RETURNS dataflow.sources AS $$ SELECT * FROM dataflow.sources WHERE name = p_name; $$ LANGUAGE sql STABLE; CREATE OR REPLACE FUNCTION create_source(p_name TEXT, p_dedup_fields TEXT[], p_config JSONB DEFAULT '{}') RETURNS dataflow.sources AS $$ INSERT INTO dataflow.sources (name, dedup_fields, config) VALUES (p_name, p_dedup_fields, p_config) RETURNING *; $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION update_source(p_name TEXT, p_dedup_fields TEXT[] DEFAULT NULL, p_config JSONB DEFAULT NULL) RETURNS dataflow.sources AS $$ UPDATE dataflow.sources SET dedup_fields = COALESCE(p_dedup_fields, dedup_fields), config = COALESCE(p_config, config), updated_at = CURRENT_TIMESTAMP WHERE name = p_name RETURNING *; $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION delete_source(p_name TEXT) RETURNS TEXT AS $$ DELETE FROM dataflow.sources WHERE name = p_name RETURNING name; $$ LANGUAGE sql; -- ── Import log ──────────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION get_import_log(p_source_name TEXT) RETURNS SETOF dataflow.import_log AS $$ SELECT * FROM dataflow.import_log WHERE source_name = p_source_name ORDER BY imported_at DESC; $$ LANGUAGE sql STABLE; -- ── Stats ───────────────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION get_source_stats(p_source_name TEXT) RETURNS TABLE (total_records BIGINT, transformed_records BIGINT, pending_records BIGINT) AS $$ SELECT COUNT(*) AS total_records, COUNT(*) FILTER (WHERE transformed IS NOT NULL) AS transformed_records, COUNT(*) FILTER (WHERE transformed IS NULL) AS pending_records FROM dataflow.records WHERE source_name = p_source_name; $$ LANGUAGE sql STABLE; -- ── Fields ──────────────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION get_source_fields(p_source_name TEXT) RETURNS TABLE (key TEXT, origins TEXT[]) AS $$ SELECT key, array_agg(DISTINCT origin ORDER BY origin) AS origins FROM ( SELECT f->>'name' AS key, 'schema' AS origin FROM dataflow.sources, jsonb_array_elements(config->'fields') f WHERE name = p_source_name AND config ? 'fields' UNION ALL SELECT jsonb_object_keys(data) AS key, 'raw' AS origin FROM dataflow.records WHERE source_name = p_source_name UNION ALL SELECT output_field AS key, 'rule: ' || name AS origin FROM dataflow.rules WHERE source_name = p_source_name UNION ALL SELECT jsonb_object_keys(output) AS key, 'mapping' AS origin FROM dataflow.mappings WHERE source_name = p_source_name ) keys GROUP BY key ORDER BY key; $$ LANGUAGE sql STABLE; -- ── View data (dynamic sort via EXECUTE) ────────────────────────────────────── CREATE OR REPLACE FUNCTION get_view_data( p_source_name TEXT, p_limit INT DEFAULT 100, p_offset INT DEFAULT 0, p_sort_col TEXT DEFAULT NULL, p_sort_dir TEXT DEFAULT 'asc' ) RETURNS JSON AS $$ DECLARE v_exists BOOLEAN; v_order TEXT := ''; v_rows JSON; BEGIN SELECT EXISTS ( SELECT 1 FROM information_schema.views WHERE table_schema = 'dfv' AND table_name = p_source_name ) INTO v_exists; IF NOT v_exists THEN RETURN json_build_object('exists', FALSE, 'rows', '[]'::json); END IF; IF p_sort_col IS NOT NULL AND EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_schema = 'dfv' AND table_name = p_source_name AND column_name = p_sort_col ) THEN v_order := ' ORDER BY ' || quote_ident(p_sort_col) || CASE WHEN lower(p_sort_dir) = 'desc' THEN ' DESC' ELSE ' ASC' END || ' NULLS LAST'; END IF; -- Subquery applies ORDER BY + LIMIT first, then json_agg collects in that order. -- json_agg on the outer query preserves column order (json not jsonb). EXECUTE format( 'SELECT COALESCE(json_agg(row_to_json(t)), ''[]''::json) FROM (SELECT * FROM dfv.%I%s LIMIT %s OFFSET %s) t', p_source_name, v_order, p_limit, p_offset ) INTO v_rows; RETURN json_build_object('exists', TRUE, 'rows', v_rows); END; $$ LANGUAGE plpgsql STABLE; -- ── Import (deduplication) ──────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION import_records(p_source_name TEXT, p_data JSONB) RETURNS JSON AS $$ DECLARE v_dedup_fields TEXT[]; v_record JSONB; v_dedup_key TEXT; v_inserted INTEGER := 0; v_duplicates INTEGER := 0; v_log_id INTEGER; BEGIN SELECT dedup_fields INTO v_dedup_fields FROM dataflow.sources WHERE name = p_source_name; IF v_dedup_fields IS NULL THEN RETURN json_build_object('success', false, 'error', 'Source not found: ' || p_source_name); END IF; FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) LOOP v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); BEGIN INSERT INTO dataflow.records (source_name, data, dedup_key) VALUES (p_source_name, v_record, v_dedup_key); v_inserted := v_inserted + 1; EXCEPTION WHEN unique_violation THEN v_duplicates := v_duplicates + 1; END; END LOOP; INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) VALUES (p_source_name, v_inserted, v_duplicates) RETURNING id INTO v_log_id; RETURN json_build_object('success', true, 'imported', v_inserted, 'duplicates', v_duplicates, 'log_id', v_log_id); END; $$ LANGUAGE plpgsql; -- ── Transformations ─────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION dataflow.jsonb_merge(a JSONB, b JSONB) RETURNS JSONB AS $$ SELECT COALESCE(a, '{}') || COALESCE(b, '{}') $$ LANGUAGE sql IMMUTABLE; DROP AGGREGATE IF EXISTS dataflow.jsonb_concat_obj(JSONB); CREATE AGGREGATE dataflow.jsonb_concat_obj(JSONB) ( sfunc = dataflow.jsonb_merge, stype = JSONB, initcond = '{}' ); DROP FUNCTION IF EXISTS apply_transformations(TEXT, INTEGER[]); CREATE OR REPLACE FUNCTION apply_transformations( p_source_name TEXT, p_record_ids INTEGER[] DEFAULT NULL, p_overwrite BOOLEAN DEFAULT FALSE ) RETURNS JSON AS $$ WITH qualifying AS ( SELECT id, data FROM dataflow.records WHERE source_name = p_source_name AND (p_overwrite OR transformed IS NULL) AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) ), rx AS ( SELECT q.id, r.name AS rule_name, r.sequence, r.output_field, r.retain, r.function_type, COALESCE(mt.rn, rp.rn, 1) AS result_number, CASE WHEN array_length(mt.mt, 1) = 1 THEN to_jsonb(mt.mt[1]) ELSE to_jsonb(mt.mt) END AS match_val, to_jsonb(rp.rp) AS replace_val FROM dataflow.rules r INNER JOIN qualifying q ON q.data ? r.field LEFT JOIN LATERAL regexp_matches(q.data ->> r.field, r.pattern, r.flags) WITH ORDINALITY AS mt(mt, rn) ON r.function_type = 'extract' LEFT JOIN LATERAL regexp_replace(q.data ->> r.field, r.pattern, r.replace_value, r.flags) WITH ORDINALITY AS rp(rp, rn) ON r.function_type = 'replace' WHERE r.source_name = p_source_name AND r.enabled = true ), agg_matches AS ( SELECT id, rule_name, sequence, output_field, retain, function_type, CASE function_type WHEN 'replace' THEN jsonb_agg(replace_val) -> 0 ELSE CASE WHEN max(result_number) = 1 THEN jsonb_agg(match_val ORDER BY result_number) -> 0 ELSE jsonb_agg(match_val ORDER BY result_number) END END AS extracted FROM rx GROUP BY id, rule_name, sequence, output_field, retain, function_type ), linked AS ( SELECT a.id, a.sequence, a.output_field, a.retain, a.extracted, m.output AS mapped FROM agg_matches a LEFT JOIN dataflow.mappings m ON m.source_name = p_source_name AND m.rule_name = a.rule_name AND m.input_value = a.extracted WHERE a.extracted IS NOT NULL ), rule_output AS ( SELECT id, sequence, CASE WHEN mapped IS NOT NULL THEN mapped || CASE WHEN retain THEN jsonb_build_object(output_field, extracted) ELSE '{}'::jsonb END ELSE jsonb_build_object(output_field, extracted) END AS output FROM linked ), record_additions AS ( SELECT id, dataflow.jsonb_concat_obj(output ORDER BY sequence) AS additions FROM rule_output GROUP BY id ), updated AS ( UPDATE dataflow.records rec SET transformed = rec.data || COALESCE(ra.additions, '{}'::jsonb), transformed_at = CURRENT_TIMESTAMP FROM qualifying q LEFT JOIN record_additions ra ON ra.id = q.id WHERE rec.id = q.id RETURNING rec.id ) SELECT json_build_object('success', true, 'transformed', count(*)) FROM updated $$ LANGUAGE sql; CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) RETURNS JSON AS $$ SELECT dataflow.apply_transformations(p_source_name, NULL, TRUE) $$ LANGUAGE sql; -- ── View generation ─────────────────────────────────────────────────────────── CREATE OR REPLACE FUNCTION generate_source_view(p_source_name TEXT) RETURNS JSON AS $$ DECLARE v_config JSONB; v_field JSONB; v_cols TEXT := ''; v_sql TEXT; v_view TEXT; BEGIN SELECT config INTO v_config FROM dataflow.sources WHERE name = p_source_name; IF v_config IS NULL OR NOT (v_config ? 'fields') OR jsonb_array_length(v_config->'fields') = 0 THEN RETURN json_build_object('success', false, 'error', 'No schema fields defined for this source'); END IF; FOR v_field IN SELECT * FROM jsonb_array_elements(v_config->'fields') LOOP IF v_cols != '' THEN v_cols := v_cols || ', '; END IF; IF v_field->>'expression' IS NOT NULL THEN DECLARE v_expr TEXT := v_field->>'expression'; v_ref TEXT; BEGIN WHILE v_expr ~ '\{[^}]+\}' LOOP v_ref := substring(v_expr FROM '\{([^}]+)\}'); v_expr := replace(v_expr, '{' || v_ref || '}', format('(transformed->>%L)::numeric', v_ref)); END LOOP; v_cols := v_cols || format('%s AS %I', v_expr, v_field->>'name'); END; ELSE CASE v_field->>'type' WHEN 'date' THEN v_cols := v_cols || format('(transformed->>%L)::date AS %I', v_field->>'name', v_field->>'name'); WHEN 'numeric' THEN v_cols := v_cols || format('(transformed->>%L)::numeric AS %I', v_field->>'name', v_field->>'name'); ELSE v_cols := v_cols || format('transformed->>%L AS %I', v_field->>'name', v_field->>'name'); END CASE; END IF; END LOOP; CREATE SCHEMA IF NOT EXISTS dfv; v_view := 'dfv.' || quote_ident(p_source_name); EXECUTE format('DROP VIEW IF EXISTS %s', v_view); v_sql := format( 'CREATE VIEW %s AS SELECT %s FROM dataflow.records WHERE source_name = %L AND transformed IS NOT NULL', v_view, v_cols, p_source_name ); EXECUTE v_sql; RETURN json_build_object('success', true, 'view', v_view, 'sql', v_sql); END; $$ LANGUAGE plpgsql;