dataflow/database/migrate_tps.sql
Paul Trowbridge d63d70cd52 Import log, constraint key overhaul, and dedup improvements
- Rename dedup_key/dedup_fields → constraint_key/constraint_fields everywhere
  (schema, functions, routes, UI, migration script, docs)
- Change constraint_key from MD5 TEXT hash to readable JSONB object
- Drop unique constraint on (source_name, constraint_key); dedup is now
  enforced at import time via CTE, allowing intra-file duplicate rows
- Add import_id FK (ON DELETE CASCADE) so deleting a log entry removes its records
- Add info JSONB to import_log with inserted_keys and excluded_keys arrays
- Add get_import_log, get_all_import_logs, delete_import SQL functions
- Auto-apply transformations immediately after import
- Import UI: expandable key detail, checkbox selection, delete with confirm,
  import ID column, transform result display
- New Log page: global import log across all sources

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-13 23:44:30 -04:00

122 lines
5.0 KiB
SQL

--
-- TPS → Dataflow Migration
--
-- Migrates sources, rules, mappings, and records from the TPS system.
-- Run against the dataflow database:
-- PGPASSWORD=dataflow psql -U dataflow -d dataflow -h localhost -f database/migrate_tps.sql
--
-- Existing rows are skipped (ON CONFLICT DO NOTHING) so the script is safe to re-run.
-- NOTE: dcard already configured in dataflow will NOT be overwritten.
--
SET search_path TO dataflow, public;
CREATE EXTENSION IF NOT EXISTS dblink;
-- Connection string to the TPS database
\set tps_conn 'host=192.168.1.110 dbname=ubm user=api password=gyaswddh1983'
\echo ''
\echo '=== 1. Sources ==='
INSERT INTO dataflow.sources (name, constraint_fields, config)
SELECT
srce AS name,
-- Strip {} wrappers from constraint paths → constraint field names
ARRAY(
SELECT regexp_replace(c, '^\{|\}$', '', 'g')
FROM jsonb_array_elements_text(defn->'constraint') AS c
) AS constraint_fields,
-- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others)
jsonb_build_object('fields',
(SELECT jsonb_agg(
jsonb_build_object(
'name', regexp_replace(col->>'path', '^\{|\}$', '', 'g'),
'type', COALESCE(NULLIF(col->>'type', ''), 'text')
) ORDER BY ord
)
FROM jsonb_array_elements(defn->'schemas'->0->'columns')
WITH ORDINALITY AS t(col, ord)
)
) AS config
FROM dblink(:'tps_conn',
'SELECT srce, defn FROM tps.srce'
) AS t(srce TEXT, defn JSONB)
ON CONFLICT (name) DO NOTHING;
SELECT name, constraint_fields, jsonb_array_length(config->'fields') AS field_count
FROM dataflow.sources ORDER BY name;
\echo ''
\echo '=== 2. Rules ==='
INSERT INTO dataflow.rules
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled, retain)
SELECT
srce AS source_name,
target AS name,
-- Strip {} from the input field key
regexp_replace(regex->'regex'->'defn'->0->>'key', '^\{|\}$', '', 'g') AS field,
regex->'regex'->'defn'->0->>'regex' AS pattern,
regex->'regex'->'defn'->0->>'field' AS output_field,
COALESCE(NULLIF(regex->'regex'->>'function', ''), 'extract') AS function_type,
COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags,
'' AS replace_value,
seq AS sequence,
true AS enabled,
(regex->'regex'->'defn'->0->>'retain') = 'y' AS retain
FROM dblink(:'tps_conn',
'SELECT srce, target, seq, regex FROM tps.map_rm'
) AS t(srce TEXT, target TEXT, seq INT, regex JSONB)
ON CONFLICT (source_name, name) DO NOTHING;
SELECT source_name, name, field, pattern, output_field, sequence
FROM dataflow.rules ORDER BY source_name, sequence;
\echo ''
\echo '=== 3. Mappings ==='
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
SELECT
srce AS source_name,
target AS rule_name,
-- retval is {"f20": "<extracted string>"} — pull out the value as JSONB
(SELECT value FROM jsonb_each(retval) LIMIT 1) AS input_value,
map AS output
FROM dblink(:'tps_conn',
'SELECT srce, target, retval, map FROM tps.map_rv'
) AS t(srce TEXT, target TEXT, retval JSONB, map JSONB)
ON CONFLICT (source_name, rule_name, input_value) DO NOTHING;
SELECT source_name, rule_name, COUNT(*) AS mapping_count
FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rule_name;
\echo ''
\echo '=== 4. Records ==='
\echo ' (13 000+ rows — may take a moment)'
INSERT INTO dataflow.records (source_name, data, constraint_key, transformed, imported_at, transformed_at)
SELECT
t.srce AS source_name,
t.rec AS data,
(SELECT jsonb_object_agg(f, t.rec->>f) FROM unnest(s.constraint_fields) AS f) AS constraint_key,
t.allj AS transformed,
CURRENT_TIMESTAMP AS imported_at,
CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at
FROM dblink(:'tps_conn',
'SELECT srce, rec, allj FROM tps.trans'
) AS t(srce TEXT, rec JSONB, allj JSONB)
JOIN dataflow.sources s ON s.name = t.srce
ON CONFLICT (source_name, constraint_key) DO NOTHING;
SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed
FROM dataflow.records GROUP BY source_name ORDER BY source_name;
\echo ''
\echo '=== Migration complete ==='
SELECT
(SELECT COUNT(*) FROM dataflow.sources) AS sources,
(SELECT COUNT(*) FROM dataflow.rules) AS rules,
(SELECT COUNT(*) FROM dataflow.mappings) AS mappings,
(SELECT COUNT(*) FROM dataflow.records) AS records;