-- -- Dataflow Database Schema -- Simple, clear structure for data transformation -- -- Create schema CREATE SCHEMA IF NOT EXISTS dataflow; -- Set search path SET search_path TO dataflow, public; ------------------------------------------------------ -- Table: sources -- Defines data sources and how to deduplicate them ------------------------------------------------------ CREATE TABLE sources ( name TEXT PRIMARY KEY, dedup_fields TEXT[] NOT NULL, -- Fields used for deduplication (e.g., ['date', 'amount', 'description']) config JSONB DEFAULT '{}'::jsonb, created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); COMMENT ON TABLE sources IS 'Data source definitions'; COMMENT ON COLUMN sources.dedup_fields IS 'Array of field names used to identify duplicate records'; COMMENT ON COLUMN sources.config IS 'Additional source configuration (optional)'; ------------------------------------------------------ -- Table: records -- Stores imported data (raw and transformed) ------------------------------------------------------ CREATE TABLE records ( id SERIAL PRIMARY KEY, source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, -- Data data JSONB NOT NULL, -- Original imported data dedup_key TEXT NOT NULL, -- Hash of dedup fields for fast lookup transformed JSONB, -- Data after transformations applied -- Metadata imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, transformed_at TIMESTAMPTZ, -- Constraints UNIQUE(source_name, dedup_key) -- Prevent duplicates ); COMMENT ON TABLE records IS 'Imported records with raw and transformed data'; COMMENT ON COLUMN records.data IS 'Original data as imported'; COMMENT ON COLUMN records.dedup_key IS 'Hash of deduplication fields for fast duplicate detection'; COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules'; -- Indexes CREATE INDEX idx_records_source ON records(source_name); CREATE INDEX idx_records_dedup ON records(source_name, dedup_key); CREATE INDEX idx_records_data ON records USING gin(data); CREATE INDEX idx_records_transformed ON records USING gin(transformed); ------------------------------------------------------ -- Table: rules -- Transformation rules (regex extraction) ------------------------------------------------------ CREATE TABLE rules ( id SERIAL PRIMARY KEY, source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, name TEXT NOT NULL, -- Rule definition field TEXT NOT NULL, -- Field to extract from (e.g., 'description') pattern TEXT NOT NULL, -- Regex pattern output_field TEXT NOT NULL, -- Name of extracted field (e.g., 'merchant') function_type TEXT NOT NULL DEFAULT 'extract', -- 'extract' or 'replace' flags TEXT NOT NULL DEFAULT '', -- Regex flags (e.g., 'i' for case-insensitive) replace_value TEXT NOT NULL DEFAULT '', -- Replacement string (replace mode only) -- Options enabled BOOLEAN DEFAULT true, retain BOOLEAN DEFAULT false, -- Write output_field even when a mapping is applied sequence INTEGER DEFAULT 0, -- Execution order -- Metadata created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(source_name, name) ); COMMENT ON TABLE rules IS 'Transformation rules for extracting data'; COMMENT ON COLUMN rules.field IS 'Source field to apply regex to'; COMMENT ON COLUMN rules.pattern IS 'Regular expression pattern'; COMMENT ON COLUMN rules.output_field IS 'Name of field to store extracted value'; CREATE INDEX idx_rules_source ON rules(source_name); ------------------------------------------------------ -- Table: mappings -- Value mappings (extracted value → standardized output) ------------------------------------------------------ CREATE TABLE mappings ( id SERIAL PRIMARY KEY, source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, rule_name TEXT NOT NULL, -- Mapping input_value JSONB NOT NULL, -- Extracted value to match (string or array of capture groups) output JSONB NOT NULL, -- Standardized output -- Metadata created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, UNIQUE(source_name, rule_name, input_value), FOREIGN KEY (source_name, rule_name) REFERENCES rules(source_name, name) ON DELETE CASCADE ); COMMENT ON TABLE mappings IS 'Maps extracted values to standardized output'; COMMENT ON COLUMN mappings.input_value IS 'Value extracted by rule'; COMMENT ON COLUMN mappings.output IS 'Standardized output (can contain multiple fields)'; CREATE INDEX idx_mappings_source_rule ON mappings(source_name, rule_name); CREATE INDEX idx_mappings_input ON mappings(source_name, rule_name, input_value); ------------------------------------------------------ -- Table: import_log -- Audit trail of imports ------------------------------------------------------ CREATE TABLE import_log ( id SERIAL PRIMARY KEY, source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, records_imported INTEGER DEFAULT 0, records_duplicate INTEGER DEFAULT 0, imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP ); COMMENT ON TABLE import_log IS 'Audit log of data imports'; CREATE INDEX idx_import_log_source ON import_log(source_name); CREATE INDEX idx_import_log_timestamp ON import_log(imported_at); ------------------------------------------------------ -- Helper function: Generate dedup key ------------------------------------------------------ CREATE OR REPLACE FUNCTION generate_dedup_key( data JSONB, dedup_fields TEXT[] ) RETURNS TEXT AS $$ DECLARE field TEXT; values TEXT := ''; BEGIN -- Concatenate values from dedup fields FOREACH field IN ARRAY dedup_fields LOOP values := values || COALESCE(data->>field, '') || '|'; END LOOP; -- Return MD5 hash of concatenated values RETURN md5(values); END; $$ LANGUAGE plpgsql IMMUTABLE; COMMENT ON FUNCTION generate_dedup_key IS 'Generate hash key from specified fields for deduplication'; ------------------------------------------------------ -- Summary ------------------------------------------------------ -- Tables: 5 (sources, records, rules, mappings, import_log) -- Simple, clear structure -- JSONB for flexibility -- Deduplication via hash key -- All transformations traceable ------------------------------------------------------