dataflow/database/queries/mappings.sql
Paul Trowbridge dc32060c42 Add global Remap page for bulk output value replacement
- SQL: search_mapping_outputs(search) — distinct (col, val, count) groups
         get_mappings_by_output_field(col, val) — individual mappings
         remap_output_field(col, from, to) — bulk UPDATE via jsonb_set
- API: GET /mappings/outputs?search=, GET /mappings/outputs/:col/:val,
       POST /mappings/remap-field
- UI: Remap page — search output values, click to select, edit the
  replacement value, see all affected mappings, apply globally
- Nav: Remap added between Mappings and Records

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-15 20:22:52 -04:00

262 lines
9.4 KiB
PL/PgSQL

--
-- Mappings queries
-- All SQL for api/routes/mappings.js
--
SET search_path TO dataflow, public;
-- ── CRUD ─────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION list_mappings(p_source_name TEXT, p_rule_name TEXT DEFAULT NULL)
RETURNS SETOF dataflow.mappings AS $$
SELECT * FROM dataflow.mappings
WHERE source_name = p_source_name
AND (p_rule_name IS NULL OR rule_name = p_rule_name)
ORDER BY rule_name, input_value::text;
$$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION get_mapping(p_id INT)
RETURNS dataflow.mappings AS $$
SELECT * FROM dataflow.mappings WHERE id = p_id;
$$ LANGUAGE sql STABLE;
CREATE OR REPLACE FUNCTION create_mapping(
p_source_name TEXT,
p_rule_name TEXT,
p_input_value JSONB,
p_output JSONB
)
RETURNS dataflow.mappings AS $$
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION upsert_mapping(
p_source_name TEXT,
p_rule_name TEXT,
p_input_value JSONB,
p_output JSONB
)
RETURNS dataflow.mappings AS $$
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
VALUES (p_source_name, p_rule_name, p_input_value, p_output)
ON CONFLICT (source_name, rule_name, input_value)
DO UPDATE SET output = EXCLUDED.output
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION update_mapping(
p_id INT,
p_input_value JSONB DEFAULT NULL,
p_output JSONB DEFAULT NULL
)
RETURNS dataflow.mappings AS $$
UPDATE dataflow.mappings SET
input_value = COALESCE(p_input_value, input_value),
output = COALESCE(p_output, output)
WHERE id = p_id
RETURNING *;
$$ LANGUAGE sql;
CREATE OR REPLACE FUNCTION delete_mapping(p_id INT)
RETURNS TABLE (id INT) AS $$
DELETE FROM dataflow.mappings WHERE id = p_id RETURNING id;
$$ LANGUAGE sql;
-- ── Counts ────────────────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_mapping_counts(p_source_name TEXT, p_rule_name TEXT DEFAULT NULL)
RETURNS TABLE (rule_name TEXT, input_value JSONB, record_count BIGINT) AS $$
SELECT
m.rule_name,
m.input_value,
COUNT(rec.id) AS record_count
FROM dataflow.mappings m
JOIN dataflow.rules r ON r.source_name = m.source_name AND r.name = m.rule_name
LEFT JOIN dataflow.records rec ON
rec.source_name = m.source_name
AND rec.transformed ? r.output_field
AND rec.transformed -> r.output_field = m.input_value
WHERE m.source_name = p_source_name
AND (p_rule_name IS NULL OR m.rule_name = p_rule_name)
GROUP BY m.rule_name, m.input_value;
$$ LANGUAGE sql STABLE;
-- ── All values (mapped + unmapped) ───────────────────────────────────────────
DROP FUNCTION IF EXISTS get_all_values(TEXT, TEXT);
CREATE FUNCTION get_all_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB,
mapping_id INTEGER,
output JSONB,
is_mapped BOOLEAN
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed -> r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed -> r.output_field
ORDER BY rec.id
) AS rn
FROM dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
),
aggregated AS (
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
)
SELECT
a.rule_name,
a.output_field,
a.source_field,
a.extracted_value,
a.record_count,
a.sample,
m.id AS mapping_id,
m.output,
(m.id IS NOT NULL) AS is_mapped
FROM aggregated a
LEFT JOIN dataflow.mappings m ON
m.source_name = p_source_name
AND m.rule_name = a.rule_name
AND m.input_value = a.extracted_value
ORDER BY a.record_count DESC;
END;
$$ LANGUAGE plpgsql;
-- ── Unmapped values ───────────────────────────────────────────────────────────
DROP FUNCTION IF EXISTS get_unmapped_values(TEXT, TEXT);
CREATE FUNCTION get_unmapped_values(
p_source_name TEXT,
p_rule_name TEXT DEFAULT NULL
) RETURNS TABLE (
rule_name TEXT,
output_field TEXT,
source_field TEXT,
extracted_value JSONB,
record_count BIGINT,
sample JSONB
) AS $$
BEGIN
RETURN QUERY
WITH extracted AS (
SELECT
r.name AS rule_name,
r.output_field,
r.field AS source_field,
rec.transformed -> r.output_field AS extracted_value,
rec.data AS record_data,
row_number() OVER (
PARTITION BY r.name, rec.transformed -> r.output_field
ORDER BY rec.id
) AS rn
FROM dataflow.records rec
CROSS JOIN dataflow.rules r
WHERE rec.source_name = p_source_name
AND r.source_name = p_source_name
AND rec.transformed IS NOT NULL
AND rec.transformed ? r.output_field
AND (p_rule_name IS NULL OR r.name = p_rule_name)
AND rec.data ? r.field
)
SELECT
e.rule_name,
e.output_field,
e.source_field,
e.extracted_value,
count(*) AS record_count,
jsonb_agg(e.record_data ORDER BY e.rn) FILTER (WHERE e.rn <= 5) AS sample
FROM extracted e
WHERE NOT EXISTS (
SELECT 1 FROM dataflow.mappings m
WHERE m.source_name = p_source_name
AND m.rule_name = e.rule_name
AND m.input_value = e.extracted_value
)
GROUP BY e.rule_name, e.output_field, e.source_field, e.extracted_value
ORDER BY count(*) DESC;
END;
$$ LANGUAGE plpgsql;
-- ── Global picklist ───────────────────────────────────────────────────────────
CREATE OR REPLACE FUNCTION get_global_output_values()
RETURNS TABLE (col TEXT, val TEXT) AS $$
SELECT DISTINCT e.key AS col, e.value AS val
FROM dataflow.mappings m
JOIN dataflow.sources s ON s.name = m.source_name
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
WHERE s.global_picklist = true
AND e.value IS NOT NULL
AND e.value <> ''
ORDER BY e.key, e.value;
$$ LANGUAGE sql STABLE;
-- ── Remap output field values ─────────────────────────────────────────────────
-- Search for distinct (field, value) pairs across all mapping outputs
CREATE OR REPLACE FUNCTION search_mapping_outputs(p_search TEXT)
RETURNS TABLE (col TEXT, val TEXT, mapping_count BIGINT) AS $$
SELECT e.key AS col, e.value AS val, COUNT(*) AS mapping_count
FROM dataflow.mappings m
CROSS JOIN LATERAL jsonb_each_text(m.output) AS e(key, value)
WHERE e.value ILIKE '%' || p_search || '%'
AND e.value IS NOT NULL
AND e.value <> ''
GROUP BY e.key, e.value
ORDER BY e.key, e.value;
$$ LANGUAGE sql STABLE;
-- Get individual mappings matching a specific output field value
CREATE OR REPLACE FUNCTION get_mappings_by_output_field(p_col TEXT, p_val TEXT)
RETURNS TABLE (id INT, source_name TEXT, rule_name TEXT, input_value JSONB, output JSONB) AS $$
SELECT m.id, m.source_name, m.rule_name, m.input_value, m.output
FROM dataflow.mappings m
WHERE m.output->>(p_col) = p_val
ORDER BY m.source_name, m.rule_name, m.input_value::text;
$$ LANGUAGE sql STABLE;
-- Replace a specific field value across all matching mappings
CREATE OR REPLACE FUNCTION remap_output_field(p_col TEXT, p_from_val TEXT, p_to_val TEXT)
RETURNS INTEGER AS $$
DECLARE
updated_count INTEGER;
BEGIN
UPDATE dataflow.mappings
SET output = jsonb_set(output, ARRAY[p_col], to_jsonb(p_to_val))
WHERE output->>(p_col) = p_from_val;
GET DIAGNOSTICS updated_count = ROW_COUNT;
RETURN updated_count;
END;
$$ LANGUAGE plpgsql;