diff --git a/api/routes/rules.js b/api/routes/rules.js index 4dfbcdd..3268ae2 100644 --- a/api/routes/rules.js +++ b/api/routes/rules.js @@ -30,32 +30,40 @@ module.exports = (pool) => { return res.status(400).json({ error: 'source, field, and pattern are required' }); } - const fullPattern = (flags ? `(?${flags})` : '') + pattern; - const query = function_type === 'replace' ? `SELECT id, data->>$1 AS raw_value, - to_jsonb(regexp_replace(data->>$1, $2, $3)) AS extracted_value + to_jsonb(regexp_replace(data->>$1, $2, $3, $4)) AS extracted_value FROM records - WHERE source_name = $4 AND data ? $1 - ORDER BY id DESC LIMIT $5` + WHERE source_name = $5 AND data ? $1 + ORDER BY id DESC LIMIT $6` : `SELECT r.id, r.data->>$1 AS raw_value, CASE - WHEN m.match IS NULL THEN NULL - WHEN cardinality(m.match) = 1 THEN to_jsonb(m.match[1]) - ELSE to_jsonb(m.match) + WHEN agg.match_count = 0 THEN NULL + WHEN agg.match_count = 1 THEN agg.matches->0 + ELSE agg.matches END AS extracted_value FROM records r - CROSS JOIN LATERAL (SELECT regexp_match(r.data->>$1, $2) AS match) m - WHERE r.source_name = $3 AND r.data ? $1 - ORDER BY r.id DESC LIMIT $4`; + CROSS JOIN LATERAL ( + SELECT + jsonb_agg( + CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) + ELSE to_jsonb(mt) + END + ORDER BY rn + ) AS matches, + count(*)::int AS match_count + FROM regexp_matches(r.data->>$1, $2, $3) WITH ORDINALITY AS m(mt, rn) + ) agg + WHERE r.source_name = $4 AND r.data ? $1 + ORDER BY r.id DESC LIMIT $5`; const params = function_type === 'replace' - ? [field, fullPattern, replace_value, source, parseInt(limit)] - : [field, fullPattern, source, parseInt(limit)]; + ? [field, pattern, replace_value, flags || '', source, parseInt(limit)] + : [field, pattern, flags || '', source, parseInt(limit)]; const result = await pool.query(query, params); res.json(result.rows); @@ -80,23 +88,31 @@ module.exports = (pool) => { const rule = ruleResult.rows[0]; - const pattern = (rule.flags ? `(?${rule.flags})` : '') + rule.pattern; const result = await pool.query( `SELECT r.id, r.data->>$1 AS raw_value, CASE - WHEN m.match IS NULL THEN NULL - WHEN cardinality(m.match) = 1 THEN to_jsonb(m.match[1]) - ELSE to_jsonb(m.match) + WHEN agg.match_count = 0 THEN NULL + WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1 + THEN to_jsonb(agg.matches[1][1]) + WHEN agg.match_count = 1 + THEN to_jsonb(agg.matches[1]) + WHEN array_length(agg.matches[1], 1) = 1 + THEN (SELECT jsonb_agg(m[1] ORDER BY idx) + FROM unnest(agg.matches) WITH ORDINALITY u(m, idx)) + ELSE to_jsonb(agg.matches) END AS extracted_value FROM records r - CROSS JOIN LATERAL (SELECT regexp_match(r.data->>$1, $2) AS match) m - WHERE r.source_name = $3 + CROSS JOIN LATERAL ( + SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count + FROM regexp_matches(r.data->>$1, $2, $3) WITH ORDINALITY AS m(mt, rn) + ) agg + WHERE r.source_name = $4 AND r.data ? $1 ORDER BY r.id DESC - LIMIT $4`, - [rule.field, pattern, rule.source_name, parseInt(limit)] + LIMIT $5`, + [rule.field, rule.pattern, rule.flags || '', rule.source_name, parseInt(limit)] ); res.json({ diff --git a/database/functions.sql b/database/functions.sql index 0b3053e..027adc8 100644 --- a/database/functions.sql +++ b/database/functions.sql @@ -78,7 +78,7 @@ DECLARE v_record RECORD; v_rule RECORD; v_transformed JSONB; - v_match TEXT[]; + v_match_count BIGINT; v_extracted JSONB; v_mapping JSONB; v_count INTEGER := 0; @@ -103,29 +103,41 @@ BEGIN LOOP -- Apply rule based on function type IF v_rule.function_type = 'replace' THEN + -- Pass flags as third arg so 'g' (replace all) works correctly v_transformed := jsonb_set( v_transformed, ARRAY[v_rule.output_field], to_jsonb(regexp_replace( v_record.data->>v_rule.field, - CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern, - v_rule.replace_value + v_rule.pattern, + v_rule.replace_value, + v_rule.flags )) ); ELSE - -- extract (default): regexp_match returns all capture groups as text[] - v_match := regexp_match( + -- extract: use regexp_matches so 'g' flag returns all occurrences + -- Aggregate directly to JSONB: single capture → scalar, multi → array + SELECT + jsonb_agg( + CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1]) + ELSE to_jsonb(mt) + END + ORDER BY rn + ), + count(*) + INTO v_extracted, v_match_count + FROM regexp_matches( v_record.data->>v_rule.field, - CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern - ); + v_rule.pattern, + v_rule.flags + ) WITH ORDINALITY AS m(mt, rn); - IF v_match IS NOT NULL THEN - -- Single capture group → scalar string; multiple groups → JSON array - IF cardinality(v_match) = 1 THEN - v_extracted := to_jsonb(v_match[1]); - ELSE - v_extracted := to_jsonb(v_match); + IF v_match_count > 0 THEN + -- Single match: unwrap the array to get scalar or capture array directly + IF v_match_count = 1 THEN + v_extracted := v_extracted->0; END IF; + -- v_extracted is now: scalar string, array of captures, or array of matches (g) -- Check if there's a mapping for this value SELECT output INTO v_mapping diff --git a/database/migrate_tps.sql b/database/migrate_tps.sql new file mode 100644 index 0000000..aa65e0f --- /dev/null +++ b/database/migrate_tps.sql @@ -0,0 +1,120 @@ +-- +-- TPS → Dataflow Migration +-- +-- Migrates sources, rules, mappings, and records from the TPS system. +-- Run against the dataflow database: +-- PGPASSWORD=dataflow psql -U dataflow -d dataflow -h localhost -f database/migrate_tps.sql +-- +-- Existing rows are skipped (ON CONFLICT DO NOTHING) so the script is safe to re-run. +-- NOTE: dcard already configured in dataflow will NOT be overwritten. +-- + +SET search_path TO dataflow, public; + +CREATE EXTENSION IF NOT EXISTS dblink; + +-- Connection string to the TPS database +\set tps_conn 'host=192.168.1.110 dbname=ubm user=api password=gyaswddh1983' + +\echo '' +\echo '=== 1. Sources ===' + +INSERT INTO dataflow.sources (name, dedup_fields, config) +SELECT + srce AS name, + -- Strip {} wrappers from constraint paths → dedup field names + ARRAY( + SELECT regexp_replace(c, '^\{|\}$', '', 'g') + FROM jsonb_array_elements_text(defn->'constraint') AS c + ) AS dedup_fields, + -- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others) + jsonb_build_object('fields', + (SELECT jsonb_agg( + jsonb_build_object( + 'name', regexp_replace(col->>'path', '^\{|\}$', '', 'g'), + 'type', COALESCE(NULLIF(col->>'type', ''), 'text') + ) ORDER BY ord + ) + FROM jsonb_array_elements(defn->'schemas'->0->'columns') + WITH ORDINALITY AS t(col, ord) + ) + ) AS config +FROM dblink(:'tps_conn', + 'SELECT srce, defn FROM tps.srce' +) AS t(srce TEXT, defn JSONB) +ON CONFLICT (name) DO NOTHING; + +SELECT name, dedup_fields, jsonb_array_length(config->'fields') AS field_count +FROM dataflow.sources ORDER BY name; + +\echo '' +\echo '=== 2. Rules ===' + +INSERT INTO dataflow.rules + (source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled) +SELECT + srce AS source_name, + target AS name, + -- Strip {} from the input field key + regexp_replace(regex->'regex'->'defn'->0->>'key', '^\{|\}$', '', 'g') AS field, + regex->'regex'->'defn'->0->>'regex' AS pattern, + regex->'regex'->'defn'->0->>'field' AS output_field, + COALESCE(NULLIF(regex->'regex'->>'function', ''), 'extract') AS function_type, + COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags, + '' AS replace_value, + seq AS sequence, + true AS enabled +FROM dblink(:'tps_conn', + 'SELECT srce, target, seq, regex FROM tps.map_rm' +) AS t(srce TEXT, target TEXT, seq INT, regex JSONB) +ON CONFLICT (source_name, name) DO NOTHING; + +SELECT source_name, name, field, pattern, output_field, sequence +FROM dataflow.rules ORDER BY source_name, sequence; + +\echo '' +\echo '=== 3. Mappings ===' + +INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output) +SELECT + srce AS source_name, + target AS rule_name, + -- retval is {"f20": ""} — pull out the value as JSONB + (SELECT value FROM jsonb_each(retval) LIMIT 1) AS input_value, + map AS output +FROM dblink(:'tps_conn', + 'SELECT srce, target, retval, map FROM tps.map_rv' +) AS t(srce TEXT, target TEXT, retval JSONB, map JSONB) +ON CONFLICT (source_name, rule_name, input_value) DO NOTHING; + +SELECT source_name, rule_name, COUNT(*) AS mapping_count +FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rule_name; + +\echo '' +\echo '=== 4. Records ===' +\echo ' (13 000+ rows — may take a moment)' + +INSERT INTO dataflow.records (source_name, data, dedup_key, transformed, imported_at, transformed_at) +SELECT + t.srce AS source_name, + t.rec AS data, + dataflow.generate_dedup_key(t.rec, s.dedup_fields) AS dedup_key, + t.allj AS transformed, + CURRENT_TIMESTAMP AS imported_at, + CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at +FROM dblink(:'tps_conn', + 'SELECT srce, rec, allj FROM tps.trans' +) AS t(srce TEXT, rec JSONB, allj JSONB) +JOIN dataflow.sources s ON s.name = t.srce +ON CONFLICT (source_name, dedup_key) DO NOTHING; + +SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed +FROM dataflow.records GROUP BY source_name ORDER BY source_name; + +\echo '' +\echo '=== Migration complete ===' +SELECT + (SELECT COUNT(*) FROM dataflow.sources) AS sources, + (SELECT COUNT(*) FROM dataflow.rules) AS rules, + (SELECT COUNT(*) FROM dataflow.mappings) AS mappings, + (SELECT COUNT(*) FROM dataflow.records) AS records;