dataflow/database/migrate_tps.sql
Paul Trowbridge f59908aaa3 Add retain flag to rules for preserving extracted values alongside mappings
Mirrors TPS's retain: y behaviour — when a mapping is applied, the extracted
value is also written to output_field so both the raw extraction and the
mapped result are available in transformed data.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 20:48:52 -04:00

122 lines
5.0 KiB
SQL

--
-- TPS → Dataflow Migration
--
-- Migrates sources, rules, mappings, and records from the TPS system.
-- Run against the dataflow database:
-- PGPASSWORD=dataflow psql -U dataflow -d dataflow -h localhost -f database/migrate_tps.sql
--
-- Existing rows are skipped (ON CONFLICT DO NOTHING) so the script is safe to re-run.
-- NOTE: dcard already configured in dataflow will NOT be overwritten.
--
SET search_path TO dataflow, public;
CREATE EXTENSION IF NOT EXISTS dblink;
-- Connection string to the TPS database
\set tps_conn 'host=192.168.1.110 dbname=ubm user=api password=gyaswddh1983'
\echo ''
\echo '=== 1. Sources ==='
INSERT INTO dataflow.sources (name, dedup_fields, config)
SELECT
srce AS name,
-- Strip {} wrappers from constraint paths → dedup field names
ARRAY(
SELECT regexp_replace(c, '^\{|\}$', '', 'g')
FROM jsonb_array_elements_text(defn->'constraint') AS c
) AS dedup_fields,
-- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others)
jsonb_build_object('fields',
(SELECT jsonb_agg(
jsonb_build_object(
'name', regexp_replace(col->>'path', '^\{|\}$', '', 'g'),
'type', COALESCE(NULLIF(col->>'type', ''), 'text')
) ORDER BY ord
)
FROM jsonb_array_elements(defn->'schemas'->0->'columns')
WITH ORDINALITY AS t(col, ord)
)
) AS config
FROM dblink(:'tps_conn',
'SELECT srce, defn FROM tps.srce'
) AS t(srce TEXT, defn JSONB)
ON CONFLICT (name) DO NOTHING;
SELECT name, dedup_fields, jsonb_array_length(config->'fields') AS field_count
FROM dataflow.sources ORDER BY name;
\echo ''
\echo '=== 2. Rules ==='
INSERT INTO dataflow.rules
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled, retain)
SELECT
srce AS source_name,
target AS name,
-- Strip {} from the input field key
regexp_replace(regex->'regex'->'defn'->0->>'key', '^\{|\}$', '', 'g') AS field,
regex->'regex'->'defn'->0->>'regex' AS pattern,
regex->'regex'->'defn'->0->>'field' AS output_field,
COALESCE(NULLIF(regex->'regex'->>'function', ''), 'extract') AS function_type,
COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags,
'' AS replace_value,
seq AS sequence,
true AS enabled,
(regex->'regex'->'defn'->0->>'retain') = 'y' AS retain
FROM dblink(:'tps_conn',
'SELECT srce, target, seq, regex FROM tps.map_rm'
) AS t(srce TEXT, target TEXT, seq INT, regex JSONB)
ON CONFLICT (source_name, name) DO NOTHING;
SELECT source_name, name, field, pattern, output_field, sequence
FROM dataflow.rules ORDER BY source_name, sequence;
\echo ''
\echo '=== 3. Mappings ==='
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
SELECT
srce AS source_name,
target AS rule_name,
-- retval is {"f20": "<extracted string>"} — pull out the value as JSONB
(SELECT value FROM jsonb_each(retval) LIMIT 1) AS input_value,
map AS output
FROM dblink(:'tps_conn',
'SELECT srce, target, retval, map FROM tps.map_rv'
) AS t(srce TEXT, target TEXT, retval JSONB, map JSONB)
ON CONFLICT (source_name, rule_name, input_value) DO NOTHING;
SELECT source_name, rule_name, COUNT(*) AS mapping_count
FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rule_name;
\echo ''
\echo '=== 4. Records ==='
\echo ' (13 000+ rows — may take a moment)'
INSERT INTO dataflow.records (source_name, data, dedup_key, transformed, imported_at, transformed_at)
SELECT
t.srce AS source_name,
t.rec AS data,
dataflow.generate_dedup_key(t.rec, s.dedup_fields) AS dedup_key,
t.allj AS transformed,
CURRENT_TIMESTAMP AS imported_at,
CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at
FROM dblink(:'tps_conn',
'SELECT srce, rec, allj FROM tps.trans'
) AS t(srce TEXT, rec JSONB, allj JSONB)
JOIN dataflow.sources s ON s.name = t.srce
ON CONFLICT (source_name, dedup_key) DO NOTHING;
SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed
FROM dataflow.records GROUP BY source_name ORDER BY source_name;
\echo ''
\echo '=== Migration complete ==='
SELECT
(SELECT COUNT(*) FROM dataflow.sources) AS sources,
(SELECT COUNT(*) FROM dataflow.rules) AS rules,
(SELECT COUNT(*) FROM dataflow.mappings) AS mappings,
(SELECT COUNT(*) FROM dataflow.records) AS records;