dataflow/SPEC.md
Paul Trowbridge 291c665ed1 Consolidate all SQL into database/queries/, switch to literal SQL in routes
- Add database/queries/{sources,rules,mappings,records}.sql — one file per
  route, all business logic in PostgreSQL functions
- Replace parameterized queries in all four route files with lit()/jsonLit()
  literal interpolation for debuggability
- Add api/lib/sql.js with lit(), jsonLit(), arr() helpers
- Fix get_view_data to use json_agg (preserves column order) with subquery
  (guarantees sort order is respected before aggregation)
- Fix jsonLit() for JSONB params so plain strings become valid JSON
- Update manage.py option 3 to deploy database/queries/ instead of functions.sql
- Add SPEC.md covering architecture, philosophy, and manage.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 22:36:53 -04:00

16 KiB
Raw Permalink Blame History

Dataflow — Application Specification

Purpose

Dataflow is an internal tool for importing, cleaning, and standardizing data from multiple sources (e.g., bank transaction exports). It is intentionally simple — no queues, no schedulers, no plugins. Everything is triggered explicitly by the user through the UI or API.


Philosophy and Design Choices

Simple over clever

Every decision favors clarity. No abstractions for their own sake, no frameworks beyond what is necessary. If a piece of code is hard to follow, that is a sign it should be rewritten, not abstracted.

All logic lives in the database

SQL functions are the single source of truth for business logic. The API layer is a thin HTTP wrapper — it validates input, calls a function, and returns the result. It does not construct business logic in JavaScript.

SQL is written as full literal strings

Database calls in the route files use fully formed SQL strings with values interpolated directly (not parameterized). This makes every query copy-pasteable into psql for debugging. A small lit() helper in api/lib/sql.js handles quoting and escaping. This is an intentional trade-off: the tool is internal, and debuggability is worth more than the marginal injection protection parameterization provides over what lit() already does.

One SQL file per route

SQL is organized in database/queries/ with one file per route (sources.sql, rules.sql, mappings.sql, records.sql). This makes it easy to find the SQL behind any API endpoint — look at the route file to find the function name, then look at the matching query file for the implementation.

Explicit over implicit

Nothing happens automatically. Transformations are triggered by the user. Views are generated on demand. There are no database triggers, no background workers, no scheduled jobs.

JSONB for flexibility

Raw imported records and transformed records are stored as JSONB. This avoids schema migrations when source formats change and allows different sources to have different field sets in the same table.


Architecture

manage.py              — interactive CLI for setup, deployment, and management
database/
  schema.sql           — table definitions (run once or to reset)
  queries/
    sources.sql        — all SQL for /api/sources
    rules.sql          — all SQL for /api/rules
    mappings.sql       — all SQL for /api/mappings
    records.sql        — all SQL for /api/records
api/
  server.js            — Express server, mounts routes, auth middleware
  middleware/
    auth.js            — Basic Auth enforcement on all /api routes
  lib/
    sql.js             — lit() and arr() helpers for SQL literal building
  routes/
    sources.js         — HTTP handlers for source management
    rules.js           — HTTP handlers for rule management
    mappings.js        — HTTP handlers for mapping management
    records.js         — HTTP handlers for record queries
ui/
  src/
    api.js             — fetch wrapper, credential management
    App.jsx            — root: login gate, sidebar, source selector, routing
    pages/
      Login.jsx        — username/password form
      Sources.jsx      — source CRUD, field config, view generation
      Import.jsx       — CSV upload and import log
      Rules.jsx        — rule CRUD with live pattern preview
      Mappings.jsx     — mapping table with TSV import/export
      Records.jsx      — paginated, sortable view of transformed records
public/                — compiled UI (output of npm run build in ui/)

Database Schema

Five tables in the dataflow schema:

sources

Defines a data source. The dedup_fields array specifies which fields make a record unique. config (JSONB) holds the output schema (fields array) used to generate the typed view.

records

Stores every imported record. data holds the raw import. transformed holds the enriched record after rules and mappings are applied. dedup_key is an MD5 hash of the dedup fields — a unique constraint on (source_name, dedup_key) prevents duplicate imports.

rules

Regex transformation rules. Each rule reads from field, applies pattern with optional flags, and writes to output_field. function_type is either extract (regexp_matches) or replace (regexp_replace). sequence controls the order rules are applied. retain keeps the raw extracted value in output_field even when a mapping overrides it.

mappings

Maps an extracted value to a standardized output object. input_value is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). output is a JSONB object that can contain multiple fields (e.g., {"vendor": "Walmart", "category": "Groceries"}).

import_log

Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates.


Data Flow

Import

CSV file → parse in Node.js → import_records(source, data)
  → generate_dedup_key() per record → INSERT with unique constraint
  → count inserted vs duplicates → log to import_log

Transform

apply_transformations(source) — pure SQL CTE
  qualifying: records not yet transformed (or all, if overwrite=true)
  rx: fan out one row per regex match (LATERAL join, rules × records)
  agg_matches: collapse matches back to one value per (record, rule)
  linked: LEFT JOIN mappings to find mapped output
  rule_output: build per-rule output JSONB
  record_additions: merge all rule outputs per record in sequence order
  UPDATE records SET transformed = data || additions

The transform is fully set-based — no row-by-row loops. All records for a source are processed in a single query.

View generation

generate_source_view(source) reads config.fields from the source and builds a CREATE VIEW statement in the dfv schema. Each field is cast to its declared type (text, numeric, date). Fields with an expression are computed from other transformed fields using {field} substitution syntax.


SQL Functions

Each file in database/queries/ maps 1-to-1 with a route file.

sources.sql list_sources, get_source, create_source, update_source, delete_source, get_import_log, get_source_stats, get_source_fields, get_view_data (plpgsql — dynamic sort via EXECUTE + quote_ident), import_records, jsonb_merge + jsonb_concat_obj aggregate, apply_transformations, reprocess_records, generate_source_view

rules.sql list_rules, get_rule, create_rule, update_rule, delete_rule, preview_rule (plpgsql — conditional query for extract vs replace), test_rule (returns TABLE(rule JSONB, results JSONB))

mappings.sql list_mappings, get_mapping, create_mapping, upsert_mapping, update_mapping, delete_mapping, get_mapping_counts, get_all_values (plpgsql — extracted values with mapping join), get_unmapped_values (plpgsql — extracted values with no mapping)

records.sql list_records, get_record, search_records (JSONB containment on data and transformed), delete_record, delete_source_records


API

All routes are under /api. Every route requires HTTP Basic Auth. The GET /health endpoint is exempt.

Authentication: Authorization: Basic <base64(user:pass)> on every request. Credentials are verified against LOGIN_USER (plaintext username) and LOGIN_PASSWORD_HASH (bcrypt hash) in .env. There are no sessions or tokens — credentials are sent with every request.

Route summary:

Method Path Description
GET /api/sources List all sources
POST /api/sources Create source
GET /api/sources/:name Get source
PUT /api/sources/:name Update source (dedup_fields, config)
DELETE /api/sources/:name Delete source and all data
POST /api/sources/suggest Suggest source config from CSV upload
POST /api/sources/:name/import Import CSV records
GET /api/sources/:name/import-log Import history
GET /api/sources/:name/stats Record counts
GET /api/sources/:name/fields All known field names and origins
GET /api/sources/:name/view-data Paginated, sortable view data
POST /api/sources/:name/transform Apply transformations (new records only)
POST /api/sources/:name/reprocess Reapply transformations to all records
POST /api/sources/:name/view Generate dfv view
GET /api/rules/source/:name List rules for a source
GET /api/rules/preview Preview pattern against live records
GET /api/rules/:id/test Test saved rule against live records
POST /api/rules Create rule
PUT /api/rules/:id Update rule
DELETE /api/rules/:id Delete rule
GET /api/mappings/source/:name List mappings
GET /api/mappings/source/:name/all-values All extracted values (mapped + unmapped)
GET /api/mappings/source/:name/unmapped Only unmapped extracted values
GET /api/mappings/source/:name/counts Record counts per mapping
GET /api/mappings/source/:name/export.tsv Export mappings as TSV
POST /api/mappings/source/:name/import-csv Import/update mappings from TSV
POST /api/mappings Create mapping
POST /api/mappings/bulk Upsert multiple mappings
PUT /api/mappings/:id Update mapping
DELETE /api/mappings/:id Delete mapping
GET /api/records/source/:name List raw records
GET /api/records/:id Get single record
POST /api/records/search Search by JSONB containment
DELETE /api/records/:id Delete record
DELETE /api/records/source/:name/all Delete all records for a source

api/lib/sql.js — SQL Literal Helpers

lit(val)  // JS value → SQL literal: 'text', TRUE, 42, NULL, '{"json":"val"}'
arr(val)  // JS array → PostgreSQL array literal: ARRAY['a','b']

Single quotes within string values are escaped by doubling them ('''). Objects and arrays are JSON-serialized. These helpers exist so that query strings in route files are fully formed and can be copied directly into psql.


UI

Built with React + Vite + Tailwind CSS. Compiled output goes to public/. The server serves public/ as static files and falls back to public/index.html for all non-API routes (SPA routing).

Authentication flow:

  1. On load, check sessionStorage for saved credentials. If found, re-authenticate silently via GET /api/sources.
  2. On login, credentials are stored in memory (api.js module-level) and in sessionStorage (survives page refresh, cleared on tab close).
  3. On 401 response, credentials are cleared and the login screen is shown.
  4. localStorage persists the selected source name across sessions.

Pages:

  • Sources — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are dedup keys and which appear in the output view. Supports CSV upload to auto-detect fields.

  • Import — Upload a CSV to import records into the selected source. Shows import log with inserted/duplicate counts per import.

  • Rules — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle.

  • Mappings — Tabular mapping editor. Shows all extracted values from transformed records with record counts and sample raw data. Rows are yellow (unmapped), white (mapped), or blue (edited but unsaved). Supports TSV export and import. Columns can be added dynamically.

  • Records — Paginated table showing the dfv.{source} view. Server-side sorting (column validated against information_schema.columns, interpolated with quote_ident). Dates are formatted YYYY-MM-DD for correct lexicographic sort.


manage.py

Interactive CLI for setup and operations. Run with python3 manage.py. Requires no arguments.

Shows current status on every screen:

  • Database connection (host, port, db, user) and whether it succeeds
  • Whether the dataflow schema is deployed
  • Whether SQL functions are deployed (detected by presence of apply_transformations)
  • Login credentials (configured / not configured)
  • UI build status and timestamp
  • Systemd service status
  • Nginx reverse proxy status

Menu options:

  1. Database configuration and deployment dialog — Write or update .env with database connection settings. If connection fails, offers to create the database and user using admin credentials. Optionally deploys schema and SQL functions after .env is written.

  2. Redeploy schema — Runs database/schema.sql against the configured database. Warns that this drops all data. Requires explicit confirmation.

  3. Redeploy SQL functions — Runs all four files in database/queries/ in order: sources.sql, rules.sql, mappings.sql, records.sql. Safe to run at any time without data loss.

  4. Build UI — Runs npm run build in ui/, outputting to public/.

  5. Set up nginx reverse proxy — Detects if a proxy already exists for the configured port. Writes an nginx site config to /etc/nginx/sites-enabled/. If an SSL certificate exists in /etc/letsencrypt/live/, configures HTTPS with HTTP redirect; otherwise configures HTTP only with an offer to run certbot. Requires sudo.

  6. Install systemd service — Copies dataflow.service to /etc/systemd/system/, runs daemon-reload, enables on boot. Requires sudo.

  7. Start / restart service — Runs systemctl start or systemctl restart depending on current state. Requires sudo.

  8. Stop service — Runs systemctl stop. Requires sudo.

  9. Set login credentials — Prompts for username and password, bcrypt-hashes the password via node -e "require('bcrypt')...", and writes LOGIN_USER and LOGIN_PASSWORD_HASH to .env. Requires Node.js and bcrypt npm package to be installed.

Key behaviors:

  • All commands that will be run are printed before the user is asked to confirm.
  • Actions that require sudo prompt transparently — sudo is not run with -n, so it uses cached credentials or prompts as normal.
  • Nginx config files are written via sudo cp from a temp file, then sudo chmod 644 to make them world-readable for status detection.
  • Certificate existence is checked with sudo test -f since /etc/letsencrypt/live/ requires root.
  • Option 1's result (updated config) is passed back to the menu loop so status reflects changes without requiring a restart.

Environment Variables (.env)

DB_HOST              PostgreSQL host
DB_PORT              PostgreSQL port (default 5432)
DB_NAME              Database name
DB_USER              Database user
DB_PASSWORD          Database password
API_PORT             Port the Express server listens on (default 3020)
NODE_ENV             development | production
LOGIN_USER           Username for Basic Auth
LOGIN_PASSWORD_HASH  bcrypt hash of the password

Running the Application

# Install API dependencies
npm install

# Install UI dependencies and build
cd ui && npm install && npm run build && cd ..

# Start (development, auto-reload)
npm run dev

# Start (production)
npm start

The server binds to 0.0.0.0 on API_PORT and serves both the API and the compiled UI from public/.


Deploying SQL Changes

Any time SQL functions are modified:

PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/sources.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/rules.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/mappings.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/records.sql

Then restart the server. Function deployment is safe to repeat — all functions use CREATE OR REPLACE.

Schema changes (schema.sql) drop and recreate the schema, deleting all data. In production, write migration scripts instead.