dataflow/database/queries/mappings.sql
Paul Trowbridge d495ef2fc5 Records filters, global picklist, autocomplete, and rule reprocess
- Records tab: regex filter bar (postgres ~*), add/remove filters, debounced,
  ANDed together; get_view_data gains p_filters JSONB param
- Global picklist: sources.global_picklist flag (default true) controls whether
  a source's mapped output values feed the cross-source autocomplete suggestion pool;
  toggle on Sources page; get_global_output_values() SQL function
- Mappings: replace native datalist with custom AutocompleteInput component —
  Alt+Down opens, Tab cycles, Enter selects, arrow keys navigate, Escape closes
- Rules: auto-reprocess source records when a rule is created or updated
- preview_rule: fix BIGINT/INT return type mismatch
- Stale get_import_log removed from sources.sql
- TSV export: fetch with auth headers instead of plain <a href> (fixes 401)
- + column button: more visible styling

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-14 16:28:26 -04:00

224 lines
7.8 KiB
PL/PgSQL

--
-- Mappings queries
-- All SQL for api/routes/mappings.js
--
SET search_path TO dataflow, public;
-- ── CRUD ─────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION list_mappings(p_source_name TEXT, p_rule_name TEXT DEFAULT NULL)
RETURNS SETOF dataflow.mappings AS $$
SELECT * FROM dataflow.mappings
WHERE source_name = p_source_name
AND (p_rule_name IS NULL OR rule_name = p_rule_name)
ORDER BY rule_name, input_value::text;
$$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION get_mapping(p_id INT)
RETURNS dataflow.mappings AS $$
SELECT * FROM dataflow.mappings WHERE id = p_id;
$$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION create_mapping(
p_source_name TEXT,
p_rule_name TEXT,
p_input_value JSONB,
p_output JSONB
)
RETURNS dataflow.mappings AS $$
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION upsert_mapping(
p_source_name TEXT,
p_rule_name TEXT,
p_input_value JSONB,
p_output JSONB
)
RETURNS dataflow.mappings AS $$
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
ON CONFLICT (source_name, rule_name, input_value)
DO UPDATE SET output = EXCLUDED.output
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION update_mapping(
p_id INT,
p_input_value JSONB DEFAULT NULL,
p_output JSONB DEFAULT NULL
)
RETURNS dataflow.mappings AS $$
UPDATE dataflow.mappings SET
input_value = COALESCE(p_input_value, input_value),
output = COALESCE(p_output, output)
WHERE id = p_id
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION delete_mapping(p_id INT)
RETURNS TABLE (id INT) AS $$
DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id;
$$ LANGUAGE sql;
-- ── Counts ────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_mapping_counts(p_source_name TEXT, p_rule_name TEXT DEFAULT NULL)
RETURNS TABLE (rule_name TEXT, input_value JSONB, record_count BIGINT) AS $$
SELECT
m.rule_name,
m.input_value,
COUNT(rec.id) AS record_count
FROM dataflow.mappings m
JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name
LEFT JOIN dataflow.records rec ON
rec.source_name = m.source_name
AND rec.transformed ? r.output_field
AND rec.transformed -> r.output_field = m.input_value
WHERE m.source_name = p_source_name
AND (p_rule_name IS NULL OR m.rule_name = p_rule_name)
GROUP BY m.rule_name, m.input_value;
$$ LANGUAGE sql STABLE;
-- ── All values (mapped + unmapped) ───────────────────────────────────────────
DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT);
CREATE FUNCTION get_all_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB,
mapping_id INTEGER,
output JSONB,
is_mapped BOOLEAN
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed -> r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed -> r.output_field
ORDER BY rec.id
) AS rn
FROM dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
),
aggregated AS (
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
)
SELECT
a.rule_name,
a.output_field,
a.source_field,
a.extracted_value,
a.record_count,
a.sample,
m.id AS mapping_id,
m.output,
(m.id IS NOT NULL) AS is_mapped
FROM aggregated a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted_value
ORDER BY a.record_count DESC;
END;
$$ LANGUAGE plpgsql;
-- ── Unmapped values ───────────────────────────────────────────────────────────
DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT);
CREATE FUNCTION get_unmapped_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed -> r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed -> r.output_field
ORDER BY rec.id
) AS rn
FROM dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
)
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
WHERE NOT EXISTS (
SELECT 1 FROM dataflow.mappings m
WHERE m.source_name = p_source_name
AND m.rule_name = e.rule_name
AND m.input_value = e.extracted_value
)
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
ORDER BY count(*) DESC;
END;
$$ LANGUAGE plpgsql;
-- ── Global picklist ───────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_global_output_values()
RETURNS TABLE (col TEXT, val TEXT) AS $$
SELECT DISTINCT e.key AS col, e.value AS val
FROM dataflow.mappings m
JOIN dataflow.sources s ON s.name = m.source_name
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
WHERE s.global_picklist = true
AND e.value IS NOT NULL
AND e.value <> ''
ORDER BY e.key, e.value;
$$ LANGUAGE sql STABLE;