- Add database/queries/{sources,rules,mappings,records}.sql — one file per
route, all business logic in PostgreSQL functions
- Replace parameterized queries in all four route files with lit()/jsonLit()
literal interpolation for debuggability
- Add api/lib/sql.js with lit(), jsonLit(), arr() helpers
- Fix get_view_data to use json_agg (preserves column order) with subquery
(guarantees sort order is respected before aggregation)
- Fix jsonLit() for JSONB params so plain strings become valid JSON
- Update manage.py option 3 to deploy database/queries/ instead of functions.sql
- Add SPEC.md covering architecture, philosophy, and manage.py
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
16 KiB
Dataflow — Application Specification
Purpose
Dataflow is an internal tool for importing, cleaning, and standardizing data from multiple sources (e.g., bank transaction exports). It is intentionally simple — no queues, no schedulers, no plugins. Everything is triggered explicitly by the user through the UI or API.
Philosophy and Design Choices
Simple over clever
Every decision favors clarity. No abstractions for their own sake, no frameworks beyond what is necessary. If a piece of code is hard to follow, that is a sign it should be rewritten, not abstracted.
All logic lives in the database
SQL functions are the single source of truth for business logic. The API layer is a thin HTTP wrapper — it validates input, calls a function, and returns the result. It does not construct business logic in JavaScript.
SQL is written as full literal strings
Database calls in the route files use fully formed SQL strings with values interpolated directly (not parameterized). This makes every query copy-pasteable into psql for debugging. A small lit() helper in api/lib/sql.js handles quoting and escaping. This is an intentional trade-off: the tool is internal, and debuggability is worth more than the marginal injection protection parameterization provides over what lit() already does.
One SQL file per route
SQL is organized in database/queries/ with one file per route (sources.sql, rules.sql, mappings.sql, records.sql). This makes it easy to find the SQL behind any API endpoint — look at the route file to find the function name, then look at the matching query file for the implementation.
Explicit over implicit
Nothing happens automatically. Transformations are triggered by the user. Views are generated on demand. There are no database triggers, no background workers, no scheduled jobs.
JSONB for flexibility
Raw imported records and transformed records are stored as JSONB. This avoids schema migrations when source formats change and allows different sources to have different field sets in the same table.
Architecture
manage.py — interactive CLI for setup, deployment, and management
database/
schema.sql — table definitions (run once or to reset)
queries/
sources.sql — all SQL for /api/sources
rules.sql — all SQL for /api/rules
mappings.sql — all SQL for /api/mappings
records.sql — all SQL for /api/records
api/
server.js — Express server, mounts routes, auth middleware
middleware/
auth.js — Basic Auth enforcement on all /api routes
lib/
sql.js — lit() and arr() helpers for SQL literal building
routes/
sources.js — HTTP handlers for source management
rules.js — HTTP handlers for rule management
mappings.js — HTTP handlers for mapping management
records.js — HTTP handlers for record queries
ui/
src/
api.js — fetch wrapper, credential management
App.jsx — root: login gate, sidebar, source selector, routing
pages/
Login.jsx — username/password form
Sources.jsx — source CRUD, field config, view generation
Import.jsx — CSV upload and import log
Rules.jsx — rule CRUD with live pattern preview
Mappings.jsx — mapping table with TSV import/export
Records.jsx — paginated, sortable view of transformed records
public/ — compiled UI (output of npm run build in ui/)
Database Schema
Five tables in the dataflow schema:
sources
Defines a data source. The dedup_fields array specifies which fields make a record unique. config (JSONB) holds the output schema (fields array) used to generate the typed view.
records
Stores every imported record. data holds the raw import. transformed holds the enriched record after rules and mappings are applied. dedup_key is an MD5 hash of the dedup fields — a unique constraint on (source_name, dedup_key) prevents duplicate imports.
rules
Regex transformation rules. Each rule reads from field, applies pattern with optional flags, and writes to output_field. function_type is either extract (regexp_matches) or replace (regexp_replace). sequence controls the order rules are applied. retain keeps the raw extracted value in output_field even when a mapping overrides it.
mappings
Maps an extracted value to a standardized output object. input_value is JSONB (matches the extracted value exactly, including arrays from multi-capture-group patterns). output is a JSONB object that can contain multiple fields (e.g., {"vendor": "Walmart", "category": "Groceries"}).
import_log
Audit trail. One row per import call, recording how many records were inserted versus skipped as duplicates.
Data Flow
Import
CSV file → parse in Node.js → import_records(source, data)
→ generate_dedup_key() per record → INSERT with unique constraint
→ count inserted vs duplicates → log to import_log
Transform
apply_transformations(source) — pure SQL CTE
qualifying: records not yet transformed (or all, if overwrite=true)
rx: fan out one row per regex match (LATERAL join, rules × records)
agg_matches: collapse matches back to one value per (record, rule)
linked: LEFT JOIN mappings to find mapped output
rule_output: build per-rule output JSONB
record_additions: merge all rule outputs per record in sequence order
UPDATE records SET transformed = data || additions
The transform is fully set-based — no row-by-row loops. All records for a source are processed in a single query.
View generation
generate_source_view(source) reads config.fields from the source and builds a CREATE VIEW statement in the dfv schema. Each field is cast to its declared type (text, numeric, date). Fields with an expression are computed from other transformed fields using {field} substitution syntax.
SQL Functions
Each file in database/queries/ maps 1-to-1 with a route file.
sources.sql
list_sources, get_source, create_source, update_source, delete_source, get_import_log, get_source_stats, get_source_fields, get_view_data (plpgsql — dynamic sort via EXECUTE + quote_ident), import_records, jsonb_merge + jsonb_concat_obj aggregate, apply_transformations, reprocess_records, generate_source_view
rules.sql
list_rules, get_rule, create_rule, update_rule, delete_rule, preview_rule (plpgsql — conditional query for extract vs replace), test_rule (returns TABLE(rule JSONB, results JSONB))
mappings.sql
list_mappings, get_mapping, create_mapping, upsert_mapping, update_mapping, delete_mapping, get_mapping_counts, get_all_values (plpgsql — extracted values with mapping join), get_unmapped_values (plpgsql — extracted values with no mapping)
records.sql
list_records, get_record, search_records (JSONB containment on data and transformed), delete_record, delete_source_records
API
All routes are under /api. Every route requires HTTP Basic Auth. The GET /health endpoint is exempt.
Authentication: Authorization: Basic <base64(user:pass)> on every request. Credentials are verified against LOGIN_USER (plaintext username) and LOGIN_PASSWORD_HASH (bcrypt hash) in .env. There are no sessions or tokens — credentials are sent with every request.
Route summary:
| Method | Path | Description |
|---|---|---|
| GET | /api/sources | List all sources |
| POST | /api/sources | Create source |
| GET | /api/sources/:name | Get source |
| PUT | /api/sources/:name | Update source (dedup_fields, config) |
| DELETE | /api/sources/:name | Delete source and all data |
| POST | /api/sources/suggest | Suggest source config from CSV upload |
| POST | /api/sources/:name/import | Import CSV records |
| GET | /api/sources/:name/import-log | Import history |
| GET | /api/sources/:name/stats | Record counts |
| GET | /api/sources/:name/fields | All known field names and origins |
| GET | /api/sources/:name/view-data | Paginated, sortable view data |
| POST | /api/sources/:name/transform | Apply transformations (new records only) |
| POST | /api/sources/:name/reprocess | Reapply transformations to all records |
| POST | /api/sources/:name/view | Generate dfv view |
| GET | /api/rules/source/:name | List rules for a source |
| GET | /api/rules/preview | Preview pattern against live records |
| GET | /api/rules/:id/test | Test saved rule against live records |
| POST | /api/rules | Create rule |
| PUT | /api/rules/:id | Update rule |
| DELETE | /api/rules/:id | Delete rule |
| GET | /api/mappings/source/:name | List mappings |
| GET | /api/mappings/source/:name/all-values | All extracted values (mapped + unmapped) |
| GET | /api/mappings/source/:name/unmapped | Only unmapped extracted values |
| GET | /api/mappings/source/:name/counts | Record counts per mapping |
| GET | /api/mappings/source/:name/export.tsv | Export mappings as TSV |
| POST | /api/mappings/source/:name/import-csv | Import/update mappings from TSV |
| POST | /api/mappings | Create mapping |
| POST | /api/mappings/bulk | Upsert multiple mappings |
| PUT | /api/mappings/:id | Update mapping |
| DELETE | /api/mappings/:id | Delete mapping |
| GET | /api/records/source/:name | List raw records |
| GET | /api/records/:id | Get single record |
| POST | /api/records/search | Search by JSONB containment |
| DELETE | /api/records/:id | Delete record |
| DELETE | /api/records/source/:name/all | Delete all records for a source |
api/lib/sql.js — SQL Literal Helpers
lit(val) // JS value → SQL literal: 'text', TRUE, 42, NULL, '{"json":"val"}'
arr(val) // JS array → PostgreSQL array literal: ARRAY['a','b']
Single quotes within string values are escaped by doubling them ('→''). Objects and arrays are JSON-serialized. These helpers exist so that query strings in route files are fully formed and can be copied directly into psql.
UI
Built with React + Vite + Tailwind CSS. Compiled output goes to public/. The server serves public/ as static files and falls back to public/index.html for all non-API routes (SPA routing).
Authentication flow:
- On load, check
sessionStoragefor saved credentials. If found, re-authenticate silently viaGET /api/sources. - On login, credentials are stored in memory (
api.jsmodule-level) and insessionStorage(survives page refresh, cleared on tab close). - On 401 response, credentials are cleared and the login screen is shown.
localStoragepersists the selected source name across sessions.
Pages:
-
Sources — View and edit source configuration. Shows all known field names and their origins (raw data, schema, rules, mappings). Checkboxes control which fields are dedup keys and which appear in the output view. Supports CSV upload to auto-detect fields.
-
Import — Upload a CSV to import records into the selected source. Shows import log with inserted/duplicate counts per import.
-
Rules — Create and manage regex rules. Live preview fires automatically (debounced 500ms) as pattern/field/flags are edited, showing match results against real records. Rules can be enabled/disabled by toggle.
-
Mappings — Tabular mapping editor. Shows all extracted values from transformed records with record counts and sample raw data. Rows are yellow (unmapped), white (mapped), or blue (edited but unsaved). Supports TSV export and import. Columns can be added dynamically.
-
Records — Paginated table showing the
dfv.{source}view. Server-side sorting (column validated againstinformation_schema.columns, interpolated withquote_ident). Dates are formattedYYYY-MM-DDfor correct lexicographic sort.
manage.py
Interactive CLI for setup and operations. Run with python3 manage.py. Requires no arguments.
Shows current status on every screen:
- Database connection (host, port, db, user) and whether it succeeds
- Whether the
dataflowschema is deployed - Whether SQL functions are deployed (detected by presence of
apply_transformations) - Login credentials (configured / not configured)
- UI build status and timestamp
- Systemd service status
- Nginx reverse proxy status
Menu options:
-
Database configuration and deployment dialog — Write or update
.envwith database connection settings. If connection fails, offers to create the database and user using admin credentials. Optionally deploys schema and SQL functions after.envis written. -
Redeploy schema — Runs
database/schema.sqlagainst the configured database. Warns that this drops all data. Requires explicit confirmation. -
Redeploy SQL functions — Runs all four files in
database/queries/in order:sources.sql,rules.sql,mappings.sql,records.sql. Safe to run at any time without data loss. -
Build UI — Runs
npm run buildinui/, outputting topublic/. -
Set up nginx reverse proxy — Detects if a proxy already exists for the configured port. Writes an nginx site config to
/etc/nginx/sites-enabled/. If an SSL certificate exists in/etc/letsencrypt/live/, configures HTTPS with HTTP redirect; otherwise configures HTTP only with an offer to run certbot. Requires sudo. -
Install systemd service — Copies
dataflow.serviceto/etc/systemd/system/, runsdaemon-reload, enables on boot. Requires sudo. -
Start / restart service — Runs
systemctl startorsystemctl restartdepending on current state. Requires sudo. -
Stop service — Runs
systemctl stop. Requires sudo. -
Set login credentials — Prompts for username and password, bcrypt-hashes the password via
node -e "require('bcrypt')...", and writesLOGIN_USERandLOGIN_PASSWORD_HASHto.env. Requires Node.js and bcrypt npm package to be installed.
Key behaviors:
- All commands that will be run are printed before the user is asked to confirm.
- Actions that require sudo prompt transparently —
sudois not run with-n, so it uses cached credentials or prompts as normal. - Nginx config files are written via
sudo cpfrom a temp file, thensudo chmod 644to make them world-readable for status detection. - Certificate existence is checked with
sudo test -fsince/etc/letsencrypt/live/requires root. - Option 1's result (updated config) is passed back to the menu loop so status reflects changes without requiring a restart.
Environment Variables (.env)
DB_HOST PostgreSQL host
DB_PORT PostgreSQL port (default 5432)
DB_NAME Database name
DB_USER Database user
DB_PASSWORD Database password
API_PORT Port the Express server listens on (default 3020)
NODE_ENV development | production
LOGIN_USER Username for Basic Auth
LOGIN_PASSWORD_HASH bcrypt hash of the password
Running the Application
# Install API dependencies
npm install
# Install UI dependencies and build
cd ui && npm install && npm run build && cd ..
# Start (development, auto-reload)
npm run dev
# Start (production)
npm start
The server binds to 0.0.0.0 on API_PORT and serves both the API and the compiled UI from public/.
Deploying SQL Changes
Any time SQL functions are modified:
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/sources.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/rules.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/mappings.sql
PGPASSWORD=<pass> psql -h <host> -U <user> -d <db> -f database/queries/records.sql
Then restart the server. Function deployment is safe to repeat — all functions use CREATE OR REPLACE.
Schema changes (schema.sql) drop and recreate the schema, deleting all data. In production, write migration scripts instead.