-- -- TPS → Dataflow Migration -- -- Migrates sources, rules, mappings, and records from the TPS system. -- Run against the dataflow database: -- PGPASSWORD=dataflow psql -U dataflow -d dataflow -h localhost -f database/migrate_tps.sql -- -- Existing rows are skipped (ON CONFLICT DO NOTHING) so the script is safe to re-run. -- NOTE: dcard already configured in dataflow will NOT be overwritten. -- SET search_path TO dataflow, public; CREATE EXTENSION IF NOT EXISTS dblink; -- Connection string to the TPS database \set tps_conn 'host=192.168.1.110 dbname=ubm user=api password=gyaswddh1983' \echo '' \echo '=== 1. Sources ===' INSERT INTO dataflow.sources (name, dedup_fields, config) SELECT srce AS name, -- Strip {} wrappers from constraint paths → dedup field names ARRAY( SELECT regexp_replace(c, '^\{|\}$', '', 'g') FROM jsonb_array_elements_text(defn->'constraint') AS c ) AS dedup_fields, -- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others) jsonb_build_object('fields', (SELECT jsonb_agg( jsonb_build_object( 'name', regexp_replace(col->>'path', '^\{|\}$', '', 'g'), 'type', COALESCE(NULLIF(col->>'type', ''), 'text') ) ORDER BY ord ) FROM jsonb_array_elements(defn->'schemas'->0->'columns') WITH ORDINALITY AS t(col, ord) ) ) AS config FROM dblink(:'tps_conn', 'SELECT srce, defn FROM tps.srce' ) AS t(srce TEXT, defn JSONB) ON CONFLICT (name) DO NOTHING; SELECT name, dedup_fields, jsonb_array_length(config->'fields') AS field_count FROM dataflow.sources ORDER BY name; \echo '' \echo '=== 2. Rules ===' INSERT INTO dataflow.rules (source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled) SELECT srce AS source_name, target AS name, -- Strip {} from the input field key regexp_replace(regex->'regex'->'defn'->0->>'key', '^\{|\}$', '', 'g') AS field, regex->'regex'->'defn'->0->>'regex' AS pattern, regex->'regex'->'defn'->0->>'field' AS output_field, COALESCE(NULLIF(regex->'regex'->>'function', ''), 'extract') AS function_type, COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags, '' AS replace_value, seq AS sequence, true AS enabled FROM dblink(:'tps_conn', 'SELECT srce, target, seq, regex FROM tps.map_rm' ) AS t(srce TEXT, target TEXT, seq INT, regex JSONB) ON CONFLICT (source_name, name) DO NOTHING; SELECT source_name, name, field, pattern, output_field, sequence FROM dataflow.rules ORDER BY source_name, sequence; \echo '' \echo '=== 3. Mappings ===' INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) SELECT srce AS source_name, target AS rule_name, -- retval is {"f20": ""} — pull out the value as JSONB (SELECT value FROM jsonb_each(retval) LIMIT 1) AS input_value, map AS output FROM dblink(:'tps_conn', 'SELECT srce, target, retval, map FROM tps.map_rv' ) AS t(srce TEXT, target TEXT, retval JSONB, map JSONB) ON CONFLICT (source_name, rule_name, input_value) DO NOTHING; SELECT source_name, rule_name, COUNT(*) AS mapping_count FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rule_name; \echo '' \echo '=== 4. Records ===' \echo ' (13 000+ rows — may take a moment)' INSERT INTO dataflow.records (source_name, data, dedup_key, transformed, imported_at, transformed_at) SELECT t.srce AS source_name, t.rec AS data, dataflow.generate_dedup_key(t.rec, s.dedup_fields) AS dedup_key, t.allj AS transformed, CURRENT_TIMESTAMP AS imported_at, CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at FROM dblink(:'tps_conn', 'SELECT srce, rec, allj FROM tps.trans' ) AS t(srce TEXT, rec JSONB, allj JSONB) JOIN dataflow.sources s ON s.name = t.srce ON CONFLICT (source_name, dedup_key) DO NOTHING; SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed FROM dataflow.records GROUP BY source_name ORDER BY source_name; \echo '' \echo '=== Migration complete ===' SELECT (SELECT COUNT(*) FROM dataflow.sources) AS sources, (SELECT COUNT(*) FROM dataflow.rules) AS rules, (SELECT COUNT(*) FROM dataflow.mappings) AS mappings, (SELECT COUNT(*) FROM dataflow.records) AS records;