Add g flag support and fix regex aggregation in extract rules
- Switch apply_transformations from regexp_match to regexp_matches with ORDINALITY, enabling the g flag to return all occurrences as a JSONB array - Aggregate matches directly to JSONB in lateral subquery to avoid text[][] type errors when subscripting array_agg results - Pass flags as proper third argument to regexp_matches/regexp_replace instead of inline (?flags) prefix — the only way g works correctly - Apply same fix to preview and test endpoints in rules.js - Add migrate_tps.sql script for migrating data from TPS to Dataflow Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
928a54932d
commit
1ed08755c1
@ -30,32 +30,40 @@ module.exports = (pool) => {
|
|||||||
return res.status(400).json({ error: 'source, field, and pattern are required' });
|
return res.status(400).json({ error: 'source, field, and pattern are required' });
|
||||||
}
|
}
|
||||||
|
|
||||||
const fullPattern = (flags ? `(?${flags})` : '') + pattern;
|
|
||||||
|
|
||||||
const query = function_type === 'replace'
|
const query = function_type === 'replace'
|
||||||
? `SELECT
|
? `SELECT
|
||||||
id,
|
id,
|
||||||
data->>$1 AS raw_value,
|
data->>$1 AS raw_value,
|
||||||
to_jsonb(regexp_replace(data->>$1, $2, $3)) AS extracted_value
|
to_jsonb(regexp_replace(data->>$1, $2, $3, $4)) AS extracted_value
|
||||||
FROM records
|
FROM records
|
||||||
WHERE source_name = $4 AND data ? $1
|
WHERE source_name = $5 AND data ? $1
|
||||||
ORDER BY id DESC LIMIT $5`
|
ORDER BY id DESC LIMIT $6`
|
||||||
: `SELECT
|
: `SELECT
|
||||||
r.id,
|
r.id,
|
||||||
r.data->>$1 AS raw_value,
|
r.data->>$1 AS raw_value,
|
||||||
CASE
|
CASE
|
||||||
WHEN m.match IS NULL THEN NULL
|
WHEN agg.match_count = 0 THEN NULL
|
||||||
WHEN cardinality(m.match) = 1 THEN to_jsonb(m.match[1])
|
WHEN agg.match_count = 1 THEN agg.matches->0
|
||||||
ELSE to_jsonb(m.match)
|
ELSE agg.matches
|
||||||
END AS extracted_value
|
END AS extracted_value
|
||||||
FROM records r
|
FROM records r
|
||||||
CROSS JOIN LATERAL (SELECT regexp_match(r.data->>$1, $2) AS match) m
|
CROSS JOIN LATERAL (
|
||||||
WHERE r.source_name = $3 AND r.data ? $1
|
SELECT
|
||||||
ORDER BY r.id DESC LIMIT $4`;
|
jsonb_agg(
|
||||||
|
CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1])
|
||||||
|
ELSE to_jsonb(mt)
|
||||||
|
END
|
||||||
|
ORDER BY rn
|
||||||
|
) AS matches,
|
||||||
|
count(*)::int AS match_count
|
||||||
|
FROM regexp_matches(r.data->>$1, $2, $3) WITH ORDINALITY AS m(mt, rn)
|
||||||
|
) agg
|
||||||
|
WHERE r.source_name = $4 AND r.data ? $1
|
||||||
|
ORDER BY r.id DESC LIMIT $5`;
|
||||||
|
|
||||||
const params = function_type === 'replace'
|
const params = function_type === 'replace'
|
||||||
? [field, fullPattern, replace_value, source, parseInt(limit)]
|
? [field, pattern, replace_value, flags || '', source, parseInt(limit)]
|
||||||
: [field, fullPattern, source, parseInt(limit)];
|
: [field, pattern, flags || '', source, parseInt(limit)];
|
||||||
|
|
||||||
const result = await pool.query(query, params);
|
const result = await pool.query(query, params);
|
||||||
res.json(result.rows);
|
res.json(result.rows);
|
||||||
@ -80,23 +88,31 @@ module.exports = (pool) => {
|
|||||||
|
|
||||||
const rule = ruleResult.rows[0];
|
const rule = ruleResult.rows[0];
|
||||||
|
|
||||||
const pattern = (rule.flags ? `(?${rule.flags})` : '') + rule.pattern;
|
|
||||||
const result = await pool.query(
|
const result = await pool.query(
|
||||||
`SELECT
|
`SELECT
|
||||||
r.id,
|
r.id,
|
||||||
r.data->>$1 AS raw_value,
|
r.data->>$1 AS raw_value,
|
||||||
CASE
|
CASE
|
||||||
WHEN m.match IS NULL THEN NULL
|
WHEN agg.match_count = 0 THEN NULL
|
||||||
WHEN cardinality(m.match) = 1 THEN to_jsonb(m.match[1])
|
WHEN agg.match_count = 1 AND array_length(agg.matches[1], 1) = 1
|
||||||
ELSE to_jsonb(m.match)
|
THEN to_jsonb(agg.matches[1][1])
|
||||||
|
WHEN agg.match_count = 1
|
||||||
|
THEN to_jsonb(agg.matches[1])
|
||||||
|
WHEN array_length(agg.matches[1], 1) = 1
|
||||||
|
THEN (SELECT jsonb_agg(m[1] ORDER BY idx)
|
||||||
|
FROM unnest(agg.matches) WITH ORDINALITY u(m, idx))
|
||||||
|
ELSE to_jsonb(agg.matches)
|
||||||
END AS extracted_value
|
END AS extracted_value
|
||||||
FROM records r
|
FROM records r
|
||||||
CROSS JOIN LATERAL (SELECT regexp_match(r.data->>$1, $2) AS match) m
|
CROSS JOIN LATERAL (
|
||||||
WHERE r.source_name = $3
|
SELECT array_agg(mt ORDER BY rn) AS matches, count(*)::int AS match_count
|
||||||
|
FROM regexp_matches(r.data->>$1, $2, $3) WITH ORDINALITY AS m(mt, rn)
|
||||||
|
) agg
|
||||||
|
WHERE r.source_name = $4
|
||||||
AND r.data ? $1
|
AND r.data ? $1
|
||||||
ORDER BY r.id DESC
|
ORDER BY r.id DESC
|
||||||
LIMIT $4`,
|
LIMIT $5`,
|
||||||
[rule.field, pattern, rule.source_name, parseInt(limit)]
|
[rule.field, rule.pattern, rule.flags || '', rule.source_name, parseInt(limit)]
|
||||||
);
|
);
|
||||||
|
|
||||||
res.json({
|
res.json({
|
||||||
|
|||||||
@ -78,7 +78,7 @@ DECLARE
|
|||||||
v_record RECORD;
|
v_record RECORD;
|
||||||
v_rule RECORD;
|
v_rule RECORD;
|
||||||
v_transformed JSONB;
|
v_transformed JSONB;
|
||||||
v_match TEXT[];
|
v_match_count BIGINT;
|
||||||
v_extracted JSONB;
|
v_extracted JSONB;
|
||||||
v_mapping JSONB;
|
v_mapping JSONB;
|
||||||
v_count INTEGER := 0;
|
v_count INTEGER := 0;
|
||||||
@ -103,29 +103,41 @@ BEGIN
|
|||||||
LOOP
|
LOOP
|
||||||
-- Apply rule based on function type
|
-- Apply rule based on function type
|
||||||
IF v_rule.function_type = 'replace' THEN
|
IF v_rule.function_type = 'replace' THEN
|
||||||
|
-- Pass flags as third arg so 'g' (replace all) works correctly
|
||||||
v_transformed := jsonb_set(
|
v_transformed := jsonb_set(
|
||||||
v_transformed,
|
v_transformed,
|
||||||
ARRAY[v_rule.output_field],
|
ARRAY[v_rule.output_field],
|
||||||
to_jsonb(regexp_replace(
|
to_jsonb(regexp_replace(
|
||||||
v_record.data->>v_rule.field,
|
v_record.data->>v_rule.field,
|
||||||
CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern,
|
v_rule.pattern,
|
||||||
v_rule.replace_value
|
v_rule.replace_value,
|
||||||
|
v_rule.flags
|
||||||
))
|
))
|
||||||
);
|
);
|
||||||
ELSE
|
ELSE
|
||||||
-- extract (default): regexp_match returns all capture groups as text[]
|
-- extract: use regexp_matches so 'g' flag returns all occurrences
|
||||||
v_match := regexp_match(
|
-- Aggregate directly to JSONB: single capture → scalar, multi → array
|
||||||
|
SELECT
|
||||||
|
jsonb_agg(
|
||||||
|
CASE WHEN array_length(mt, 1) = 1 THEN to_jsonb(mt[1])
|
||||||
|
ELSE to_jsonb(mt)
|
||||||
|
END
|
||||||
|
ORDER BY rn
|
||||||
|
),
|
||||||
|
count(*)
|
||||||
|
INTO v_extracted, v_match_count
|
||||||
|
FROM regexp_matches(
|
||||||
v_record.data->>v_rule.field,
|
v_record.data->>v_rule.field,
|
||||||
CASE WHEN v_rule.flags != '' THEN '(?' || v_rule.flags || ')' ELSE '' END || v_rule.pattern
|
v_rule.pattern,
|
||||||
);
|
v_rule.flags
|
||||||
|
) WITH ORDINALITY AS m(mt, rn);
|
||||||
|
|
||||||
IF v_match IS NOT NULL THEN
|
IF v_match_count > 0 THEN
|
||||||
-- Single capture group → scalar string; multiple groups → JSON array
|
-- Single match: unwrap the array to get scalar or capture array directly
|
||||||
IF cardinality(v_match) = 1 THEN
|
IF v_match_count = 1 THEN
|
||||||
v_extracted := to_jsonb(v_match[1]);
|
v_extracted := v_extracted->0;
|
||||||
ELSE
|
|
||||||
v_extracted := to_jsonb(v_match);
|
|
||||||
END IF;
|
END IF;
|
||||||
|
-- v_extracted is now: scalar string, array of captures, or array of matches (g)
|
||||||
|
|
||||||
-- Check if there's a mapping for this value
|
-- Check if there's a mapping for this value
|
||||||
SELECT output INTO v_mapping
|
SELECT output INTO v_mapping
|
||||||
|
|||||||
120
database/migrate_tps.sql
Normal file
120
database/migrate_tps.sql
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
--
|
||||||
|
-- TPS → Dataflow Migration
|
||||||
|
--
|
||||||
|
-- Migrates sources, rules, mappings, and records from the TPS system.
|
||||||
|
-- Run against the dataflow database:
|
||||||
|
-- PGPASSWORD=dataflow psql -U dataflow -d dataflow -h localhost -f database/migrate_tps.sql
|
||||||
|
--
|
||||||
|
-- Existing rows are skipped (ON CONFLICT DO NOTHING) so the script is safe to re-run.
|
||||||
|
-- NOTE: dcard already configured in dataflow will NOT be overwritten.
|
||||||
|
--
|
||||||
|
|
||||||
|
SET search_path TO dataflow, public;
|
||||||
|
|
||||||
|
CREATE EXTENSION IF NOT EXISTS dblink;
|
||||||
|
|
||||||
|
-- Connection string to the TPS database
|
||||||
|
\set tps_conn 'host=192.168.1.110 dbname=ubm user=api password=gyaswddh1983'
|
||||||
|
|
||||||
|
\echo ''
|
||||||
|
\echo '=== 1. Sources ==='
|
||||||
|
|
||||||
|
INSERT INTO dataflow.sources (name, dedup_fields, config)
|
||||||
|
SELECT
|
||||||
|
srce AS name,
|
||||||
|
-- Strip {} wrappers from constraint paths → dedup field names
|
||||||
|
ARRAY(
|
||||||
|
SELECT regexp_replace(c, '^\{|\}$', '', 'g')
|
||||||
|
FROM jsonb_array_elements_text(defn->'constraint') AS c
|
||||||
|
) AS dedup_fields,
|
||||||
|
-- Build config.fields from the first schema (index 0 = "mapped" for dcard, "default" for others)
|
||||||
|
jsonb_build_object('fields',
|
||||||
|
(SELECT jsonb_agg(
|
||||||
|
jsonb_build_object(
|
||||||
|
'name', regexp_replace(col->>'path', '^\{|\}$', '', 'g'),
|
||||||
|
'type', COALESCE(NULLIF(col->>'type', ''), 'text')
|
||||||
|
) ORDER BY ord
|
||||||
|
)
|
||||||
|
FROM jsonb_array_elements(defn->'schemas'->0->'columns')
|
||||||
|
WITH ORDINALITY AS t(col, ord)
|
||||||
|
)
|
||||||
|
) AS config
|
||||||
|
FROM dblink(:'tps_conn',
|
||||||
|
'SELECT srce, defn FROM tps.srce'
|
||||||
|
) AS t(srce TEXT, defn JSONB)
|
||||||
|
ON CONFLICT (name) DO NOTHING;
|
||||||
|
|
||||||
|
SELECT name, dedup_fields, jsonb_array_length(config->'fields') AS field_count
|
||||||
|
FROM dataflow.sources ORDER BY name;
|
||||||
|
|
||||||
|
\echo ''
|
||||||
|
\echo '=== 2. Rules ==='
|
||||||
|
|
||||||
|
INSERT INTO dataflow.rules
|
||||||
|
(source_name, name, field, pattern, output_field, function_type, flags, replace_value, sequence, enabled)
|
||||||
|
SELECT
|
||||||
|
srce AS source_name,
|
||||||
|
target AS name,
|
||||||
|
-- Strip {} from the input field key
|
||||||
|
regexp_replace(regex->'regex'->'defn'->0->>'key', '^\{|\}$', '', 'g') AS field,
|
||||||
|
regex->'regex'->'defn'->0->>'regex' AS pattern,
|
||||||
|
regex->'regex'->'defn'->0->>'field' AS output_field,
|
||||||
|
COALESCE(NULLIF(regex->'regex'->>'function', ''), 'extract') AS function_type,
|
||||||
|
COALESCE(regex->'regex'->'defn'->0->>'flag', '') AS flags,
|
||||||
|
'' AS replace_value,
|
||||||
|
seq AS sequence,
|
||||||
|
true AS enabled
|
||||||
|
FROM dblink(:'tps_conn',
|
||||||
|
'SELECT srce, target, seq, regex FROM tps.map_rm'
|
||||||
|
) AS t(srce TEXT, target TEXT, seq INT, regex JSONB)
|
||||||
|
ON CONFLICT (source_name, name) DO NOTHING;
|
||||||
|
|
||||||
|
SELECT source_name, name, field, pattern, output_field, sequence
|
||||||
|
FROM dataflow.rules ORDER BY source_name, sequence;
|
||||||
|
|
||||||
|
\echo ''
|
||||||
|
\echo '=== 3. Mappings ==='
|
||||||
|
|
||||||
|
INSERT INTO dataflow.mappings (source_name, rule_name, input_value, output)
|
||||||
|
SELECT
|
||||||
|
srce AS source_name,
|
||||||
|
target AS rule_name,
|
||||||
|
-- retval is {"f20": "<extracted string>"} — pull out the value as JSONB
|
||||||
|
(SELECT value FROM jsonb_each(retval) LIMIT 1) AS input_value,
|
||||||
|
map AS output
|
||||||
|
FROM dblink(:'tps_conn',
|
||||||
|
'SELECT srce, target, retval, map FROM tps.map_rv'
|
||||||
|
) AS t(srce TEXT, target TEXT, retval JSONB, map JSONB)
|
||||||
|
ON CONFLICT (source_name, rule_name, input_value) DO NOTHING;
|
||||||
|
|
||||||
|
SELECT source_name, rule_name, COUNT(*) AS mapping_count
|
||||||
|
FROM dataflow.mappings GROUP BY source_name, rule_name ORDER BY source_name, rule_name;
|
||||||
|
|
||||||
|
\echo ''
|
||||||
|
\echo '=== 4. Records ==='
|
||||||
|
\echo ' (13 000+ rows — may take a moment)'
|
||||||
|
|
||||||
|
INSERT INTO dataflow.records (source_name, data, dedup_key, transformed, imported_at, transformed_at)
|
||||||
|
SELECT
|
||||||
|
t.srce AS source_name,
|
||||||
|
t.rec AS data,
|
||||||
|
dataflow.generate_dedup_key(t.rec, s.dedup_fields) AS dedup_key,
|
||||||
|
t.allj AS transformed,
|
||||||
|
CURRENT_TIMESTAMP AS imported_at,
|
||||||
|
CASE WHEN t.allj IS NOT NULL THEN CURRENT_TIMESTAMP END AS transformed_at
|
||||||
|
FROM dblink(:'tps_conn',
|
||||||
|
'SELECT srce, rec, allj FROM tps.trans'
|
||||||
|
) AS t(srce TEXT, rec JSONB, allj JSONB)
|
||||||
|
JOIN dataflow.sources s ON s.name = t.srce
|
||||||
|
ON CONFLICT (source_name, dedup_key) DO NOTHING;
|
||||||
|
|
||||||
|
SELECT source_name, COUNT(*) AS records, COUNT(transformed) AS transformed
|
||||||
|
FROM dataflow.records GROUP BY source_name ORDER BY source_name;
|
||||||
|
|
||||||
|
\echo ''
|
||||||
|
\echo '=== Migration complete ==='
|
||||||
|
SELECT
|
||||||
|
(SELECT COUNT(*) FROM dataflow.sources) AS sources,
|
||||||
|
(SELECT COUNT(*) FROM dataflow.rules) AS rules,
|
||||||
|
(SELECT COUNT(*) FROM dataflow.mappings) AS mappings,
|
||||||
|
(SELECT COUNT(*) FROM dataflow.records) AS records;
|
||||||
Loading…
Reference in New Issue
Block a user