commit 3e2d56991c33cfb560627399febf3cf268040340 Author: Paul Trowbridge Date: Sat Mar 28 00:44:13 2026 -0400 Initial commit: dataflow data transformation tool diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6e0a044 --- /dev/null +++ b/.env.example @@ -0,0 +1,10 @@ +# Database Configuration +DB_HOST=localhost +DB_PORT=5432 +DB_NAME=dataflow +DB_USER=postgres +DB_PASSWORD=your_password_here + +# API Configuration +API_PORT=3000 +NODE_ENV=development diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1dcc9a9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,24 @@ +# Environment +.env + +# Dependencies +node_modules/ +package-lock.json + +# Logs +*.log +logs/ + +# OS files +.DS_Store +Thumbs.db + +# IDE +.vscode/ +.idea/ +*.swp +*.swo + +# Uploads +uploads/* +!uploads/.gitkeep diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..0bad192 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,206 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Overview + +Dataflow is a simple data transformation tool for importing, cleaning, and standardizing data from various sources. Built with PostgreSQL and Node.js/Express, it emphasizes clarity and simplicity over complexity. + +## Core Concepts + +1. **Sources** - Define data sources and deduplication rules (which fields make a record unique) +2. **Import** - Load CSV data, automatically deduplicating based on source rules +3. **Rules** - Extract information using regex patterns (e.g., extract merchant from transaction description) +4. **Mappings** - Map extracted values to standardized output (e.g., "WALMART" → {"vendor": "Walmart", "category": "Groceries"}) +5. **Transform** - Apply rules and mappings to create clean, enriched data + +## Architecture + +### Database Schema (`database/schema.sql`) + +**5 simple tables:** +- `sources` - Source definitions with `dedup_fields` array +- `records` - Imported data with `data` (raw) and `transformed` (enriched) JSONB columns +- `rules` - Regex extraction rules with `field`, `pattern`, `output_field` +- `mappings` - Input/output value mappings +- `import_log` - Audit trail + +**Key design:** +- JSONB for flexible data storage +- Deduplication via MD5 hash of specified fields +- Simple, flat structure (no complex relationships) + +### Database Functions (`database/functions.sql`) + +**4 focused functions:** +- `import_records(source_name, data)` - Import with deduplication +- `apply_transformations(source_name, record_ids)` - Apply rules and mappings +- `get_unmapped_values(source_name, rule_name)` - Find values needing mappings +- `reprocess_records(source_name)` - Re-transform all records + +**Design principle:** Each function does ONE thing. No nested CTEs, no duplication. + +### API Server (`api/server.js` + `api/routes/`) + +**RESTful endpoints:** +- `/api/sources` - CRUD sources, import CSV, trigger transformations +- `/api/rules` - CRUD transformation rules +- `/api/mappings` - CRUD value mappings, view unmapped values +- `/api/records` - Query and search transformed data + +**Route files:** +- `routes/sources.js` - Source management and CSV import +- `routes/rules.js` - Rule management +- `routes/mappings.js` - Mapping management + unmapped values +- `routes/records.js` - Record queries and search + +## Common Development Tasks + +### Running the Application + +```bash +# Setup (first time only) +./setup.sh + +# Start development server with auto-reload +npm run dev + +# Start production server +npm start + +# Test API +curl http://localhost:3000/health +``` + +### Database Changes + +When modifying schema: +1. Edit `database/schema.sql` +2. Drop and recreate schema: `psql -d dataflow -f database/schema.sql` +3. Redeploy functions: `psql -d dataflow -f database/functions.sql` + +For production, write migration scripts instead of dropping schema. + +### Adding a New API Endpoint + +1. Add route to appropriate file in `api/routes/` +2. Follow existing patterns (async/await, error handling via `next()`) +3. Use parameterized queries to prevent SQL injection +4. Return consistent JSON format + +### Testing + +Manual testing workflow: +1. Create a source: `POST /api/sources` +2. Create rules: `POST /api/rules` +3. Import data: `POST /api/sources/:name/import` +4. Apply transformations: `POST /api/sources/:name/transform` +5. View results: `GET /api/records/source/:name` + +See `examples/GETTING_STARTED.md` for complete curl examples. + +## Design Principles + +1. **Simple over clever** - Straightforward code beats optimization +2. **Explicit over implicit** - No magic, no hidden triggers +3. **Clear naming** - `data` not `rec`, `transformed` not `allj` +4. **One function, one job** - No 250-line functions +5. **JSONB for flexibility** - Handle varying schemas without migrations + +## Common Patterns + +### Import Flow +``` +CSV file → parse → import_records() → records table (data column) +``` + +### Transformation Flow +``` +records.data → apply_transformations() → + - Apply each rule (regex extraction) + - Look up mappings + - Merge into records.transformed +``` + +### Deduplication +- Hash is MD5 of concatenated values from `dedup_fields` +- Unique constraint on `(source_name, dedup_key)` prevents duplicates +- Import function catches unique violations and counts them + +### Error Handling +- API routes use `try/catch` and pass errors to `next(err)` +- Server.js has global error handler +- Database functions return JSON with `success` boolean + +## File Structure + +``` +dataflow/ +├── database/ +│ ├── schema.sql # Table definitions +│ └── functions.sql # Import/transform functions +├── api/ +│ ├── server.js # Express server +│ └── routes/ # API endpoints +│ ├── sources.js +│ ├── rules.js +│ ├── mappings.js +│ └── records.js +├── examples/ +│ ├── GETTING_STARTED.md # Tutorial +│ └── bank_transactions.csv +├── .env.example # Config template +├── package.json +└── README.md +``` + +## Comparison to Legacy TPS System + +This project replaces an older system (in `/opt/tps`) that had: +- 2,150 lines of complex SQL with heavy duplication +- 5 nearly-identical 200+ line functions +- Confusing names and deep nested CTEs +- Complex trigger-based processing + +Dataflow achieves the same functionality with: +- ~400 lines of simple SQL +- 4 focused functions +- Clear names and linear logic +- Explicit API-triggered processing + +The simplification makes it easy to understand, modify, and maintain. + +## Troubleshooting + +**Database connection fails:** +- Check `.env` file exists and has correct credentials +- Verify PostgreSQL is running: `psql -U postgres -l` +- Check search path is set: Should default to `dataflow` schema + +**Import succeeds but transformation fails:** +- Check rules exist: `SELECT * FROM dataflow.rules WHERE source_name = 'xxx'` +- Verify field names match CSV columns +- Test regex pattern manually +- Check for SQL errors in logs + +**All records marked as duplicates:** +- Verify `dedup_fields` match actual field names in data +- Check if data was already imported +- Use different source name for testing + +## Adding New Features + +When adding features, follow these principles: +- Add ONE function that does ONE thing +- Keep functions under 100 lines if possible +- Write clear SQL, not clever SQL +- Add API endpoint that calls the function +- Document in README.md and update examples + +## Notes for Claude + +- This is a **simple** system by design - don't over-engineer it +- Keep functions focused and linear +- Use JSONB for flexibility, not as a crutch for bad design +- When confused, read the examples/GETTING_STARTED.md walkthrough +- The old TPS system is in `/opt/tps` - this is a clean rewrite, not a refactor diff --git a/README.md b/README.md new file mode 100644 index 0000000..70b27c7 --- /dev/null +++ b/README.md @@ -0,0 +1,120 @@ +# Dataflow + +A simple, understandable data transformation tool for ingesting, mapping, and transforming data from various sources. + +## What It Does + +Dataflow helps you: +1. **Import** data from CSV files (or other formats) +2. **Transform** data using regex rules to extract meaningful information +3. **Map** extracted values to standardized output +4. **Query** the transformed data + +Perfect for cleaning up messy data like bank transactions, product lists, or any repetitive data that needs normalization. + +## Core Concepts + +### 1. Sources +Define where data comes from and how to deduplicate it. + +**Example:** Bank transactions deduplicated by date + amount + description + +### 2. Rules +Extract information using regex patterns. + +**Example:** Extract merchant name from transaction description + +### 3. Mappings +Map extracted values to clean, standardized output. + +**Example:** "DISCOUNT DRUG MART 32" → {"vendor": "Discount Drug Mart", "category": "Healthcare"} + +## Architecture + +- **Database:** PostgreSQL with JSONB for flexibility +- **API:** Node.js/Express for REST endpoints +- **Storage:** Raw data preserved, transformations are computed and stored + +## Design Principles + +- **Simple & Clear** - Easy to understand what's happening +- **Explicit** - No hidden magic or complex triggers +- **Testable** - Every function can be tested independently +- **Flexible** - Handle varying data formats without schema changes + +## Getting Started + +### Prerequisites +- PostgreSQL 12+ +- Node.js 16+ + +### Installation + +1. Install dependencies: +```bash +npm install +``` + +2. Configure database (copy .env.example to .env and edit): +```bash +cp .env.example .env +``` + +3. Deploy database schema: +```bash +psql -U postgres -d dataflow -f database/schema.sql +psql -U postgres -d dataflow -f database/functions.sql +``` + +4. Start the API server: +```bash +npm start +``` + +## Quick Example + +```javascript +// 1. Define a source +POST /api/sources +{ + "name": "bank_transactions", + "dedup_fields": ["date", "amount", "description"] +} + +// 2. Create a transformation rule +POST /api/sources/bank_transactions/rules +{ + "name": "extract_merchant", + "pattern": "^([A-Z][A-Z ]+)", + "field": "description" +} + +// 3. Import data +POST /api/sources/bank_transactions/import +[CSV file upload] + +// 4. Query transformed data +GET /api/sources/bank_transactions/records +``` + +## Project Structure + +``` +dataflow/ +├── database/ # PostgreSQL schema and functions +│ ├── schema.sql # Table definitions +│ └── functions.sql # Import and transformation functions +├── api/ # Express REST API +│ ├── server.js # Main server +│ └── routes/ # API route handlers +├── examples/ # Sample data and use cases +└── docs/ # Additional documentation +``` + +## Status + +**Current Phase:** Initial development - building core functionality + +## License + +MIT diff --git a/api/routes/mappings.js b/api/routes/mappings.js new file mode 100644 index 0000000..5831516 --- /dev/null +++ b/api/routes/mappings.js @@ -0,0 +1,178 @@ +/** + * Mappings Routes + * Manage value mappings + */ + +const express = require('express'); + +module.exports = (pool) => { + const router = express.Router(); + + // List all mappings for a source + router.get('/source/:source_name', async (req, res, next) => { + try { + const { rule_name } = req.query; + + let query = 'SELECT * FROM mappings WHERE source_name = $1'; + const params = [req.params.source_name]; + + if (rule_name) { + query += ' AND rule_name = $2'; + params.push(rule_name); + } + + query += ' ORDER BY rule_name, input_value'; + + const result = await pool.query(query, params); + res.json(result.rows); + } catch (err) { + next(err); + } + }); + + // Get unmapped values + router.get('/source/:source_name/unmapped', async (req, res, next) => { + try { + const { rule_name } = req.query; + + const result = await pool.query( + 'SELECT * FROM get_unmapped_values($1, $2)', + [req.params.source_name, rule_name || null] + ); + + res.json(result.rows); + } catch (err) { + next(err); + } + }); + + // Get single mapping + router.get('/:id', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT * FROM mappings WHERE id = $1', + [req.params.id] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Mapping not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Create mapping + router.post('/', async (req, res, next) => { + try { + const { source_name, rule_name, input_value, output } = req.body; + + if (!source_name || !rule_name || !input_value || !output) { + return res.status(400).json({ + error: 'Missing required fields: source_name, rule_name, input_value, output' + }); + } + + const result = await pool.query( + `INSERT INTO mappings (source_name, rule_name, input_value, output) + VALUES ($1, $2, $3, $4) + RETURNING *`, + [source_name, rule_name, input_value, JSON.stringify(output)] + ); + + res.status(201).json(result.rows[0]); + } catch (err) { + if (err.code === '23505') { // Unique violation + return res.status(409).json({ error: 'Mapping already exists' }); + } + if (err.code === '23503') { // Foreign key violation + return res.status(404).json({ error: 'Source or rule not found' }); + } + next(err); + } + }); + + // Bulk create mappings + router.post('/bulk', async (req, res, next) => { + const client = await pool.connect(); + try { + const { mappings } = req.body; + + if (!Array.isArray(mappings)) { + return res.status(400).json({ error: 'Expected array of mappings' }); + } + + await client.query('BEGIN'); + + const results = []; + for (const mapping of mappings) { + const { source_name, rule_name, input_value, output } = mapping; + + const result = await client.query( + `INSERT INTO mappings (source_name, rule_name, input_value, output) + VALUES ($1, $2, $3, $4) + ON CONFLICT (source_name, rule_name, input_value) + DO UPDATE SET output = EXCLUDED.output + RETURNING *`, + [source_name, rule_name, input_value, JSON.stringify(output)] + ); + + results.push(result.rows[0]); + } + + await client.query('COMMIT'); + res.status(201).json({ count: results.length, mappings: results }); + } catch (err) { + await client.query('ROLLBACK'); + next(err); + } finally { + client.release(); + } + }); + + // Update mapping + router.put('/:id', async (req, res, next) => { + try { + const { input_value, output } = req.body; + + const result = await pool.query( + `UPDATE mappings + SET input_value = COALESCE($2, input_value), + output = COALESCE($3, output) + WHERE id = $1 + RETURNING *`, + [req.params.id, input_value, output ? JSON.stringify(output) : null] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Mapping not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Delete mapping + router.delete('/:id', async (req, res, next) => { + try { + const result = await pool.query( + 'DELETE FROM mappings WHERE id = $1 RETURNING id', + [req.params.id] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Mapping not found' }); + } + + res.json({ success: true, deleted: result.rows[0].id }); + } catch (err) { + next(err); + } + }); + + return router; +}; diff --git a/api/routes/records.js b/api/routes/records.js new file mode 100644 index 0000000..be3ebdb --- /dev/null +++ b/api/routes/records.js @@ -0,0 +1,111 @@ +/** + * Records Routes + * Query and manage imported records + */ + +const express = require('express'); + +module.exports = (pool) => { + const router = express.Router(); + + // List records for a source + router.get('/source/:source_name', async (req, res, next) => { + try { + const { limit = 100, offset = 0, transformed_only } = req.query; + + let query = 'SELECT * FROM records WHERE source_name = $1'; + const params = [req.params.source_name]; + + if (transformed_only === 'true') { + query += ' AND transformed IS NOT NULL'; + } + + query += ' ORDER BY id DESC LIMIT $2 OFFSET $3'; + params.push(parseInt(limit), parseInt(offset)); + + const result = await pool.query(query, params); + res.json(result.rows); + } catch (err) { + next(err); + } + }); + + // Get single record + router.get('/:id', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT * FROM records WHERE id = $1', + [req.params.id] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Record not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Search records + router.post('/search', async (req, res, next) => { + try { + const { source_name, query, limit = 100 } = req.body; + + if (!source_name || !query) { + return res.status(400).json({ + error: 'Missing required fields: source_name, query' + }); + } + + // Search in both data and transformed fields + const result = await pool.query( + `SELECT * FROM records + WHERE source_name = $1 + AND (data @> $2 OR transformed @> $2) + ORDER BY id DESC + LIMIT $3`, + [source_name, JSON.stringify(query), parseInt(limit)] + ); + + res.json(result.rows); + } catch (err) { + next(err); + } + }); + + // Delete record + router.delete('/:id', async (req, res, next) => { + try { + const result = await pool.query( + 'DELETE FROM records WHERE id = $1 RETURNING id', + [req.params.id] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Record not found' }); + } + + res.json({ success: true, deleted: result.rows[0].id }); + } catch (err) { + next(err); + } + }); + + // Delete all records for a source + router.delete('/source/:source_name/all', async (req, res, next) => { + try { + const result = await pool.query( + 'DELETE FROM records WHERE source_name = $1', + [req.params.source_name] + ); + + res.json({ success: true, deleted_count: result.rowCount }); + } catch (err) { + next(err); + } + }); + + return router; +}; diff --git a/api/routes/rules.js b/api/routes/rules.js new file mode 100644 index 0000000..0812684 --- /dev/null +++ b/api/routes/rules.js @@ -0,0 +1,119 @@ +/** + * Rules Routes + * Manage transformation rules + */ + +const express = require('express'); + +module.exports = (pool) => { + const router = express.Router(); + + // List all rules for a source + router.get('/source/:source_name', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT * FROM rules WHERE source_name = $1 ORDER BY sequence, name', + [req.params.source_name] + ); + res.json(result.rows); + } catch (err) { + next(err); + } + }); + + // Get single rule + router.get('/:id', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT * FROM rules WHERE id = $1', + [req.params.id] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Rule not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Create rule + router.post('/', async (req, res, next) => { + try { + const { source_name, name, field, pattern, output_field, enabled, sequence } = req.body; + + if (!source_name || !name || !field || !pattern || !output_field) { + return res.status(400).json({ + error: 'Missing required fields: source_name, name, field, pattern, output_field' + }); + } + + const result = await pool.query( + `INSERT INTO rules (source_name, name, field, pattern, output_field, enabled, sequence) + VALUES ($1, $2, $3, $4, $5, $6, $7) + RETURNING *`, + [source_name, name, field, pattern, output_field, enabled !== false, sequence || 0] + ); + + res.status(201).json(result.rows[0]); + } catch (err) { + if (err.code === '23505') { // Unique violation + return res.status(409).json({ error: 'Rule already exists for this source' }); + } + if (err.code === '23503') { // Foreign key violation + return res.status(404).json({ error: 'Source not found' }); + } + next(err); + } + }); + + // Update rule + router.put('/:id', async (req, res, next) => { + try { + const { name, field, pattern, output_field, enabled, sequence } = req.body; + + const result = await pool.query( + `UPDATE rules + SET name = COALESCE($2, name), + field = COALESCE($3, field), + pattern = COALESCE($4, pattern), + output_field = COALESCE($5, output_field), + enabled = COALESCE($6, enabled), + sequence = COALESCE($7, sequence) + WHERE id = $1 + RETURNING *`, + [req.params.id, name, field, pattern, output_field, enabled, sequence] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Rule not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Delete rule + router.delete('/:id', async (req, res, next) => { + try { + const result = await pool.query( + 'DELETE FROM rules WHERE id = $1 RETURNING id, name', + [req.params.id] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Rule not found' }); + } + + res.json({ success: true, deleted: result.rows[0] }); + } catch (err) { + next(err); + } + }); + + return router; +}; diff --git a/api/routes/sources.js b/api/routes/sources.js new file mode 100644 index 0000000..b4a1333 --- /dev/null +++ b/api/routes/sources.js @@ -0,0 +1,189 @@ +/** + * Sources Routes + * Manage data sources + */ + +const express = require('express'); +const multer = require('multer'); +const { parse } = require('csv-parse/sync'); + +const upload = multer({ storage: multer.memoryStorage() }); + +module.exports = (pool) => { + const router = express.Router(); + + // List all sources + router.get('/', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT * FROM sources ORDER BY name' + ); + res.json(result.rows); + } catch (err) { + next(err); + } + }); + + // Get single source + router.get('/:name', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT * FROM sources WHERE name = $1', + [req.params.name] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Source not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Create source + router.post('/', async (req, res, next) => { + try { + const { name, dedup_fields, config } = req.body; + + if (!name || !dedup_fields || !Array.isArray(dedup_fields)) { + return res.status(400).json({ + error: 'Missing required fields: name, dedup_fields (array)' + }); + } + + const result = await pool.query( + `INSERT INTO sources (name, dedup_fields, config) + VALUES ($1, $2, $3) + RETURNING *`, + [name, dedup_fields, config || {}] + ); + + res.status(201).json(result.rows[0]); + } catch (err) { + if (err.code === '23505') { // Unique violation + return res.status(409).json({ error: 'Source already exists' }); + } + next(err); + } + }); + + // Update source + router.put('/:name', async (req, res, next) => { + try { + const { dedup_fields, config } = req.body; + + const result = await pool.query( + `UPDATE sources + SET dedup_fields = COALESCE($2, dedup_fields), + config = COALESCE($3, config), + updated_at = CURRENT_TIMESTAMP + WHERE name = $1 + RETURNING *`, + [req.params.name, dedup_fields, config] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Source not found' }); + } + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + // Delete source + router.delete('/:name', async (req, res, next) => { + try { + const result = await pool.query( + 'DELETE FROM sources WHERE name = $1 RETURNING name', + [req.params.name] + ); + + if (result.rows.length === 0) { + return res.status(404).json({ error: 'Source not found' }); + } + + res.json({ success: true, deleted: result.rows[0].name }); + } catch (err) { + next(err); + } + }); + + // Import CSV data + router.post('/:name/import', upload.single('file'), async (req, res, next) => { + try { + if (!req.file) { + return res.status(400).json({ error: 'No file uploaded' }); + } + + // Parse CSV + const records = parse(req.file.buffer, { + columns: true, + skip_empty_lines: true, + trim: true + }); + + // Import records + const result = await pool.query( + 'SELECT import_records($1, $2) as result', + [req.params.name, JSON.stringify(records)] + ); + + res.json(result.rows[0].result); + } catch (err) { + next(err); + } + }); + + // Apply transformations + router.post('/:name/transform', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT apply_transformations($1) as result', + [req.params.name] + ); + + res.json(result.rows[0].result); + } catch (err) { + next(err); + } + }); + + // Reprocess all records + router.post('/:name/reprocess', async (req, res, next) => { + try { + const result = await pool.query( + 'SELECT reprocess_records($1) as result', + [req.params.name] + ); + + res.json(result.rows[0].result); + } catch (err) { + next(err); + } + }); + + // Get statistics + router.get('/:name/stats', async (req, res, next) => { + try { + const result = await pool.query( + `SELECT + COUNT(*) as total_records, + COUNT(*) FILTER (WHERE transformed IS NOT NULL) as transformed_records, + COUNT(*) FILTER (WHERE transformed IS NULL) as pending_records + FROM records + WHERE source_name = $1`, + [req.params.name] + ); + + res.json(result.rows[0]); + } catch (err) { + next(err); + } + }); + + return router; +}; diff --git a/api/server.js b/api/server.js new file mode 100644 index 0000000..7fd1783 --- /dev/null +++ b/api/server.js @@ -0,0 +1,97 @@ +/** + * Dataflow API Server + * Simple REST API for data transformation + */ + +require('dotenv').config(); +const express = require('express'); +const { Pool } = require('pg'); + +const app = express(); +const PORT = process.env.API_PORT || 3000; + +// Database connection +const pool = new Pool({ + host: process.env.DB_HOST, + port: process.env.DB_PORT, + database: process.env.DB_NAME, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD +}); + +// Middleware +app.use(express.json()); +app.use(express.urlencoded({ extended: true })); + +// Set search path for all queries +pool.on('connect', (client) => { + client.query('SET search_path TO dataflow, public'); +}); + +// Test database connection +pool.query('SELECT NOW()', (err, res) => { + if (err) { + console.error('Database connection error:', err); + process.exit(1); + } + console.log('✓ Database connected'); +}); + +//------------------------------------------------------ +// Routes +//------------------------------------------------------ + +// Import route modules +const sourcesRoutes = require('./routes/sources'); +const rulesRoutes = require('./routes/rules'); +const mappingsRoutes = require('./routes/mappings'); +const recordsRoutes = require('./routes/records'); + +// Mount routes +app.use('/api/sources', sourcesRoutes(pool)); +app.use('/api/rules', rulesRoutes(pool)); +app.use('/api/mappings', mappingsRoutes(pool)); +app.use('/api/records', recordsRoutes(pool)); + +// Health check +app.get('/health', (req, res) => { + res.json({ status: 'ok', timestamp: new Date().toISOString() }); +}); + +// Root endpoint +app.get('/', (req, res) => { + res.json({ + name: 'Dataflow API', + version: '0.1.0', + endpoints: { + sources: '/api/sources', + rules: '/api/rules', + mappings: '/api/mappings', + records: '/api/records', + health: '/health' + } + }); +}); + +// Error handler +app.use((err, req, res, next) => { + console.error('Error:', err); + res.status(err.status || 500).json({ + error: err.message || 'Internal server error', + ...(process.env.NODE_ENV === 'development' && { stack: err.stack }) + }); +}); + +// 404 handler +app.use((req, res) => { + res.status(404).json({ error: 'Endpoint not found' }); +}); + +// Start server +app.listen(PORT, () => { + console.log(`✓ Dataflow API listening on port ${PORT}`); + console.log(` Health: http://localhost:${PORT}/health`); + console.log(` API: http://localhost:${PORT}/api/sources`); +}); + +module.exports = app; diff --git a/database/functions.sql b/database/functions.sql new file mode 100644 index 0000000..3e65a98 --- /dev/null +++ b/database/functions.sql @@ -0,0 +1,231 @@ +-- +-- Dataflow Functions +-- Simple, clear functions for import and transformation +-- + +SET search_path TO dataflow, public; + +------------------------------------------------------ +-- Function: import_records +-- Import data with automatic deduplication +------------------------------------------------------ +CREATE OR REPLACE FUNCTION import_records( + p_source_name TEXT, + p_data JSONB -- Array of records +) RETURNS JSON AS $$ +DECLARE + v_dedup_fields TEXT[]; + v_record JSONB; + v_dedup_key TEXT; + v_inserted INTEGER := 0; + v_duplicates INTEGER := 0; + v_log_id INTEGER; +BEGIN + -- Get dedup fields for this source + SELECT dedup_fields INTO v_dedup_fields + FROM dataflow.sources + WHERE name = p_source_name; + + IF v_dedup_fields IS NULL THEN + RETURN json_build_object( + 'success', false, + 'error', 'Source not found: ' || p_source_name + ); + END IF; + + -- Process each record + FOR v_record IN SELECT * FROM jsonb_array_elements(p_data) + LOOP + -- Generate dedup key + v_dedup_key := dataflow.generate_dedup_key(v_record, v_dedup_fields); + + -- Try to insert (will fail silently if duplicate) + BEGIN + INSERT INTO dataflow.records (source_name, data, dedup_key) + VALUES (p_source_name, v_record, v_dedup_key); + + v_inserted := v_inserted + 1; + EXCEPTION WHEN unique_violation THEN + v_duplicates := v_duplicates + 1; + END; + END LOOP; + + -- Log the import + INSERT INTO dataflow.import_log (source_name, records_imported, records_duplicate) + VALUES (p_source_name, v_inserted, v_duplicates) + RETURNING id INTO v_log_id; + + RETURN json_build_object( + 'success', true, + 'imported', v_inserted, + 'duplicates', v_duplicates, + 'log_id', v_log_id + ); +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION import_records IS 'Import records with automatic deduplication'; + +------------------------------------------------------ +-- Function: apply_transformations +-- Apply all transformation rules to records +------------------------------------------------------ +CREATE OR REPLACE FUNCTION apply_transformations( + p_source_name TEXT, + p_record_ids INTEGER[] DEFAULT NULL -- NULL = all untransformed +) RETURNS JSON AS $$ +DECLARE + v_record RECORD; + v_rule RECORD; + v_transformed JSONB; + v_extracted TEXT; + v_mapping JSONB; + v_count INTEGER := 0; +BEGIN + -- Loop through records to transform + FOR v_record IN + SELECT id, data + FROM dataflow.records + WHERE source_name = p_source_name + AND (p_record_ids IS NULL OR id = ANY(p_record_ids)) + AND transformed IS NULL + LOOP + -- Start with original data + v_transformed := v_record.data; + + -- Apply each rule in sequence + FOR v_rule IN + SELECT * FROM dataflow.rules + WHERE source_name = p_source_name + AND enabled = true + ORDER BY sequence + LOOP + -- Extract value using regex + v_extracted := ( + SELECT substring(v_record.data->>v_rule.field FROM v_rule.pattern) + ); + + IF v_extracted IS NOT NULL THEN + -- Check if there's a mapping for this value + SELECT output INTO v_mapping + FROM dataflow.mappings + WHERE source_name = p_source_name + AND rule_name = v_rule.name + AND input_value = v_extracted; + + IF v_mapping IS NOT NULL THEN + -- Apply mapping (merge mapped fields into result) + v_transformed := v_transformed || v_mapping; + ELSE + -- No mapping, just add extracted value + v_transformed := jsonb_set( + v_transformed, + ARRAY[v_rule.output_field], + to_jsonb(v_extracted) + ); + END IF; + END IF; + END LOOP; + + -- Update record with transformed data + UPDATE dataflow.records + SET transformed = v_transformed, + transformed_at = CURRENT_TIMESTAMP + WHERE id = v_record.id; + + v_count := v_count + 1; + END LOOP; + + RETURN json_build_object( + 'success', true, + 'transformed', v_count + ); +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION apply_transformations IS 'Apply transformation rules and mappings to records'; + +------------------------------------------------------ +-- Function: get_unmapped_values +-- Find extracted values that need mappings +------------------------------------------------------ +CREATE OR REPLACE FUNCTION get_unmapped_values( + p_source_name TEXT, + p_rule_name TEXT DEFAULT NULL +) RETURNS TABLE ( + rule_name TEXT, + output_field TEXT, + extracted_value TEXT, + record_count BIGINT +) AS $$ +BEGIN + RETURN QUERY + WITH extracted AS ( + -- Get all transformed records and extract rule output fields + SELECT + r.name AS rule_name, + r.output_field, + rec.transformed->>r.output_field AS extracted_value + FROM + dataflow.records rec + CROSS JOIN dataflow.rules r + WHERE + rec.source_name = p_source_name + AND r.source_name = p_source_name + AND rec.transformed IS NOT NULL + AND rec.transformed ? r.output_field + AND (p_rule_name IS NULL OR r.name = p_rule_name) + ) + SELECT + e.rule_name, + e.output_field, + e.extracted_value, + count(*) AS record_count + FROM extracted e + WHERE NOT EXISTS ( + -- Exclude values that already have mappings + SELECT 1 FROM dataflow.mappings m + WHERE m.source_name = p_source_name + AND m.rule_name = e.rule_name + AND m.input_value = e.extracted_value + ) + GROUP BY e.rule_name, e.output_field, e.extracted_value + ORDER BY record_count DESC; +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION get_unmapped_values IS 'Find extracted values that need mappings defined'; + +------------------------------------------------------ +-- Function: reprocess_records +-- Clear and reapply transformations +------------------------------------------------------ +CREATE OR REPLACE FUNCTION reprocess_records(p_source_name TEXT) +RETURNS JSON AS $$ +BEGIN + -- Clear existing transformations + UPDATE dataflow.records + SET transformed = NULL, + transformed_at = NULL + WHERE source_name = p_source_name; + + -- Reapply transformations + RETURN dataflow.apply_transformations(p_source_name); +END; +$$ LANGUAGE plpgsql; + +COMMENT ON FUNCTION reprocess_records IS 'Clear and reapply all transformations for a source'; + +------------------------------------------------------ +-- Summary +------------------------------------------------------ +-- Functions: 4 simple, focused functions +-- 1. import_records - Import with deduplication +-- 2. apply_transformations - Apply rules and mappings +-- 3. get_unmapped_values - Find values needing mappings +-- 4. reprocess_records - Re-transform all records +-- +-- Each function does ONE thing clearly +-- No complex nested CTEs +-- Easy to understand and debug +------------------------------------------------------ diff --git a/database/schema.sql b/database/schema.sql new file mode 100644 index 0000000..b93f8dc --- /dev/null +++ b/database/schema.sql @@ -0,0 +1,166 @@ +-- +-- Dataflow Database Schema +-- Simple, clear structure for data transformation +-- + +-- Create schema +CREATE SCHEMA IF NOT EXISTS dataflow; + +-- Set search path +SET search_path TO dataflow, public; + +------------------------------------------------------ +-- Table: sources +-- Defines data sources and how to deduplicate them +------------------------------------------------------ +CREATE TABLE sources ( + name TEXT PRIMARY KEY, + dedup_fields TEXT[] NOT NULL, -- Fields used for deduplication (e.g., ['date', 'amount', 'description']) + config JSONB DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE sources IS 'Data source definitions'; +COMMENT ON COLUMN sources.dedup_fields IS 'Array of field names used to identify duplicate records'; +COMMENT ON COLUMN sources.config IS 'Additional source configuration (optional)'; + +------------------------------------------------------ +-- Table: records +-- Stores imported data (raw and transformed) +------------------------------------------------------ +CREATE TABLE records ( + id SERIAL PRIMARY KEY, + source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, + + -- Data + data JSONB NOT NULL, -- Original imported data + dedup_key TEXT NOT NULL, -- Hash of dedup fields for fast lookup + transformed JSONB, -- Data after transformations applied + + -- Metadata + imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + transformed_at TIMESTAMPTZ, + + -- Constraints + UNIQUE(source_name, dedup_key) -- Prevent duplicates +); + +COMMENT ON TABLE records IS 'Imported records with raw and transformed data'; +COMMENT ON COLUMN records.data IS 'Original data as imported'; +COMMENT ON COLUMN records.dedup_key IS 'Hash of deduplication fields for fast duplicate detection'; +COMMENT ON COLUMN records.transformed IS 'Data after applying transformation rules'; + +-- Indexes +CREATE INDEX idx_records_source ON records(source_name); +CREATE INDEX idx_records_dedup ON records(source_name, dedup_key); +CREATE INDEX idx_records_data ON records USING gin(data); +CREATE INDEX idx_records_transformed ON records USING gin(transformed); + +------------------------------------------------------ +-- Table: rules +-- Transformation rules (regex extraction) +------------------------------------------------------ +CREATE TABLE rules ( + id SERIAL PRIMARY KEY, + source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, + name TEXT NOT NULL, + + -- Rule definition + field TEXT NOT NULL, -- Field to extract from (e.g., 'description') + pattern TEXT NOT NULL, -- Regex pattern + output_field TEXT NOT NULL, -- Name of extracted field (e.g., 'merchant') + + -- Options + enabled BOOLEAN DEFAULT true, + sequence INTEGER DEFAULT 0, -- Execution order + + -- Metadata + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + + UNIQUE(source_name, name) +); + +COMMENT ON TABLE rules IS 'Transformation rules for extracting data'; +COMMENT ON COLUMN rules.field IS 'Source field to apply regex to'; +COMMENT ON COLUMN rules.pattern IS 'Regular expression pattern'; +COMMENT ON COLUMN rules.output_field IS 'Name of field to store extracted value'; + +CREATE INDEX idx_rules_source ON rules(source_name); + +------------------------------------------------------ +-- Table: mappings +-- Value mappings (extracted value → standardized output) +------------------------------------------------------ +CREATE TABLE mappings ( + id SERIAL PRIMARY KEY, + source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, + rule_name TEXT NOT NULL, + + -- Mapping + input_value TEXT NOT NULL, -- Extracted value to match + output JSONB NOT NULL, -- Standardized output + + -- Metadata + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + + UNIQUE(source_name, rule_name, input_value), + FOREIGN KEY (source_name, rule_name) REFERENCES rules(source_name, name) ON DELETE CASCADE +); + +COMMENT ON TABLE mappings IS 'Maps extracted values to standardized output'; +COMMENT ON COLUMN mappings.input_value IS 'Value extracted by rule'; +COMMENT ON COLUMN mappings.output IS 'Standardized output (can contain multiple fields)'; + +CREATE INDEX idx_mappings_source_rule ON mappings(source_name, rule_name); +CREATE INDEX idx_mappings_input ON mappings(source_name, rule_name, input_value); + +------------------------------------------------------ +-- Table: import_log +-- Audit trail of imports +------------------------------------------------------ +CREATE TABLE import_log ( + id SERIAL PRIMARY KEY, + source_name TEXT NOT NULL REFERENCES sources(name) ON DELETE CASCADE, + records_imported INTEGER DEFAULT 0, + records_duplicate INTEGER DEFAULT 0, + imported_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +COMMENT ON TABLE import_log IS 'Audit log of data imports'; + +CREATE INDEX idx_import_log_source ON import_log(source_name); +CREATE INDEX idx_import_log_timestamp ON import_log(imported_at); + +------------------------------------------------------ +-- Helper function: Generate dedup key +------------------------------------------------------ +CREATE OR REPLACE FUNCTION generate_dedup_key( + data JSONB, + dedup_fields TEXT[] +) RETURNS TEXT AS $$ +DECLARE + field TEXT; + values TEXT := ''; +BEGIN + -- Concatenate values from dedup fields + FOREACH field IN ARRAY dedup_fields LOOP + values := values || COALESCE(data->>field, '') || '|'; + END LOOP; + + -- Return MD5 hash of concatenated values + RETURN md5(values); +END; +$$ LANGUAGE plpgsql IMMUTABLE; + +COMMENT ON FUNCTION generate_dedup_key IS 'Generate hash key from specified fields for deduplication'; + +------------------------------------------------------ +-- Summary +------------------------------------------------------ +-- Tables: 5 (sources, records, rules, mappings, import_log) +-- Simple, clear structure +-- JSONB for flexibility +-- Deduplication via hash key +-- All transformations traceable +------------------------------------------------------ diff --git a/examples/GETTING_STARTED.md b/examples/GETTING_STARTED.md new file mode 100644 index 0000000..66ba022 --- /dev/null +++ b/examples/GETTING_STARTED.md @@ -0,0 +1,311 @@ +# Getting Started with Dataflow + +This guide walks through a complete example using bank transaction data. + +## Prerequisites + +1. PostgreSQL database running +2. Database created: `CREATE DATABASE dataflow;` +3. `.env` file configured (copy from `.env.example`) + +## Step 1: Deploy Database Schema + +```bash +cd /opt/dataflow +psql -U postgres -d dataflow -f database/schema.sql +psql -U postgres -d dataflow -f database/functions.sql +``` + +You should see tables created without errors. + +## Step 2: Start the API Server + +```bash +npm install +npm start +``` + +The server should start on port 3000 (or your configured port). + +Test it: +```bash +curl http://localhost:3000/health +# Should return: {"status":"ok","timestamp":"..."} +``` + +## Step 3: Create a Data Source + +A source defines where data comes from and how to deduplicate it. + +```bash +curl -X POST http://localhost:3000/api/sources \ + -H "Content-Type: application/json" \ + -d '{ + "name": "bank_transactions", + "dedup_fields": ["date", "description", "amount"] + }' +``` + +**What this does:** Records with the same date + description + amount will be considered duplicates. + +## Step 4: Create Transformation Rules + +Rules extract meaningful data using regex patterns. + +### Rule 1: Extract merchant name (first part of description) + +```bash +curl -X POST http://localhost:3000/api/rules \ + -H "Content-Type: application/json" \ + -d '{ + "source_name": "bank_transactions", + "name": "extract_merchant", + "field": "description", + "pattern": "^([A-Z][A-Z ]+)", + "output_field": "merchant", + "sequence": 1 + }' +``` + +### Rule 2: Extract location (city + state pattern) + +```bash +curl -X POST http://localhost:3000/api/rules \ + -H "Content-Type: application/json" \ + -d '{ + "source_name": "bank_transactions", + "name": "extract_location", + "field": "description", + "pattern": "([A-Z]+) OH", + "output_field": "location", + "sequence": 2 + }' +``` + +## Step 5: Import Data + +Import the example CSV file: + +```bash +curl -X POST http://localhost:3000/api/sources/bank_transactions/import \ + -F "file=@examples/bank_transactions.csv" +``` + +Response: +```json +{ + "success": true, + "imported": 14, + "duplicates": 0, + "log_id": 1 +} +``` + +## Step 6: View Imported Records + +```bash +curl http://localhost:3000/api/records/source/bank_transactions?limit=5 +``` + +You'll see the raw imported data. Note that `transformed` is `null` - we haven't applied transformations yet! + +## Step 7: Apply Transformations + +```bash +curl -X POST http://localhost:3000/api/sources/bank_transactions/transform +``` + +Response: +```json +{ + "success": true, + "transformed": 14 +} +``` + +Now check the records again: +```bash +curl http://localhost:3000/api/records/source/bank_transactions?limit=2 +``` + +You'll see the `transformed` field now contains the original data plus extracted fields like `merchant` and `location`. + +## Step 8: View Extracted Values That Need Mapping + +```bash +curl http://localhost:3000/api/mappings/source/bank_transactions/unmapped +``` + +Response shows extracted merchant names that aren't mapped yet: +```json +[ + {"rule_name": "extract_merchant", "extracted_value": "GOOGLE", "record_count": 2}, + {"rule_name": "extract_merchant", "extracted_value": "TARGET", "record_count": 2}, + {"rule_name": "extract_merchant", "extracted_value": "WALMART", "record_count": 1}, + ... +] +``` + +## Step 9: Create Value Mappings + +Map extracted values to clean, standardized output: + +```bash +curl -X POST http://localhost:3000/api/mappings \ + -H "Content-Type: application/json" \ + -d '{ + "source_name": "bank_transactions", + "rule_name": "extract_merchant", + "input_value": "GOOGLE", + "output": { + "vendor": "Google", + "category": "Technology" + } + }' + +curl -X POST http://localhost:3000/api/mappings \ + -H "Content-Type: application/json" \ + -d '{ + "source_name": "bank_transactions", + "rule_name": "extract_merchant", + "input_value": "TARGET", + "output": { + "vendor": "Target", + "category": "Retail" + } + }' + +curl -X POST http://localhost:3000/api/mappings \ + -H "Content-Type: application/json" \ + -d '{ + "source_name": "bank_transactions", + "rule_name": "extract_merchant", + "input_value": "WALMART", + "output": { + "vendor": "Walmart", + "category": "Groceries" + } + }' +``` + +## Step 10: Reprocess With Mappings + +Clear and reapply transformations to pick up the new mappings: + +```bash +curl -X POST http://localhost:3000/api/sources/bank_transactions/reprocess +``` + +## Step 11: View Final Results + +```bash +curl http://localhost:3000/api/records/source/bank_transactions?limit=5 +``` + +Now the `transformed` field contains: +- Original fields (date, description, amount, category) +- Extracted fields (merchant, location) +- Mapped fields (vendor, category from mappings) + +Example result: +```json +{ + "id": 1, + "data": { + "date": "2024-01-02", + "description": "GOOGLE *YOUTUBE VIDEOS", + "amount": "4.26", + "category": "Services" + }, + "transformed": { + "date": "2024-01-02", + "description": "GOOGLE *YOUTUBE VIDEOS", + "amount": "4.26", + "category": "Services", + "merchant": "GOOGLE", + "vendor": "Google", + "category": "Technology" + } +} +``` + +## Step 12: Test Deduplication + +Try importing the same file again: + +```bash +curl -X POST http://localhost:3000/api/sources/bank_transactions/import \ + -F "file=@examples/bank_transactions.csv" +``` + +Response: +```json +{ + "success": true, + "imported": 0, + "duplicates": 14, + "log_id": 2 +} +``` + +All records were rejected as duplicates! ✓ + +## Summary + +You've now: +- ✅ Created a data source with deduplication rules +- ✅ Defined transformation rules to extract data +- ✅ Imported CSV data +- ✅ Applied transformations +- ✅ Created value mappings for clean output +- ✅ Reprocessed data with mappings +- ✅ Tested deduplication + +## Next Steps + +- Add more rules for other extraction patterns +- Create more value mappings as needed +- Query the `transformed` data for reporting +- Import additional CSV files + +## Useful Commands + +```bash +# View all sources +curl http://localhost:3000/api/sources + +# View source statistics +curl http://localhost:3000/api/sources/bank_transactions/stats + +# View all rules for a source +curl http://localhost:3000/api/rules/source/bank_transactions + +# View all mappings for a source +curl http://localhost:3000/api/mappings/source/bank_transactions + +# Search for specific records +curl -X POST http://localhost:3000/api/records/search \ + -H "Content-Type: application/json" \ + -d '{ + "source_name": "bank_transactions", + "query": {"vendor": "Google"}, + "limit": 10 + }' +``` + +## Troubleshooting + +**API won't start:** +- Check `.env` file exists with correct database credentials +- Verify PostgreSQL is running: `psql -U postgres -l` +- Check logs for error messages + +**Import fails:** +- Verify source exists: `curl http://localhost:3000/api/sources` +- Check CSV format matches expectations +- Ensure dedup_fields match CSV column names + +**Transformations not working:** +- Check rules exist: `curl http://localhost:3000/api/rules/source/bank_transactions` +- Test regex pattern manually +- Check records have the specified field diff --git a/examples/bank_transactions.csv b/examples/bank_transactions.csv new file mode 100644 index 0000000..6e43e56 --- /dev/null +++ b/examples/bank_transactions.csv @@ -0,0 +1,15 @@ +date,description,amount,category +2024-01-02,GOOGLE *YOUTUBE VIDEOS,4.26,Services +2024-01-03,CLEVELAND CLINIC,200.00,Medical +2024-01-04,AT&T *PAYMENT,57.14,Services +2024-01-05,SUBWAY 00044289255 STOW OH,10.25,Restaurants +2024-01-06,TARGET STOW OH,197.90,Merchandise +2024-01-09,CIRCLE K 05416 STOW OH,52.99,Gasoline +2024-01-13,DISCOUNT DRUG MART 32 STOW OH,26.68,Merchandise +2024-01-16,BUFFALO WILD WINGS KENT,63.22,Restaurants +2024-01-19,WALMART GROCERY,17.16,Supermarkets +2024-01-20,GOOGLE *GOOGLE PLAY,2.12,Services +2024-01-20,LOWES OF STOW OH,256.48,Home Improvement +2024-01-27,GIANT-EAGLE #4096 STOW OH,67.81,Supermarkets +2024-01-29,NETFLIX.COM,14.93,Services +2024-01-30,TARGET STOW OH,49.37,Merchandise diff --git a/package.json b/package.json new file mode 100644 index 0000000..5896fba --- /dev/null +++ b/package.json @@ -0,0 +1,29 @@ +{ + "name": "dataflow", + "version": "0.1.0", + "description": "Simple data transformation tool for ingesting, mapping, and transforming data", + "main": "api/server.js", + "scripts": { + "start": "node api/server.js", + "dev": "nodemon api/server.js", + "test": "echo \"Tests coming soon\" && exit 0" + }, + "keywords": [ + "etl", + "data-transformation", + "data-pipeline", + "csv-import" + ], + "author": "", + "license": "MIT", + "dependencies": { + "express": "^4.18.2", + "pg": "^8.11.3", + "dotenv": "^16.3.1", + "multer": "^1.4.5-lts.1", + "csv-parse": "^5.5.2" + }, + "devDependencies": { + "nodemon": "^3.0.1" + } +} diff --git a/setup.sh b/setup.sh new file mode 100755 index 0000000..e72d08d --- /dev/null +++ b/setup.sh @@ -0,0 +1,69 @@ +#!/bin/bash +# +# Dataflow Setup Script +# Quick setup for development +# + +set -e + +echo "🚀 Dataflow Setup" +echo "=================" +echo "" + +# Check if .env exists +if [ ! -f .env ]; then + echo "📝 Creating .env from template..." + cp .env.example .env + echo "⚠️ Please edit .env with your database credentials" + echo "" + read -p "Press enter to continue after editing .env..." +fi + +# Load .env +export $(cat .env | grep -v '^#' | xargs) + +# Install dependencies +echo "📦 Installing Node.js dependencies..." +npm install +echo "" + +# Test database connection +echo "🔍 Testing database connection..." +if psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d postgres -c '\q' 2>/dev/null; then + echo "✓ PostgreSQL connection successful" +else + echo "✗ Cannot connect to PostgreSQL" + echo " Please check your database credentials in .env" + exit 1 +fi + +# Create database if it doesn't exist +echo "" +echo "🗄️ Checking database..." +if psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -lqt | cut -d \| -f 1 | grep -qw "$DB_NAME"; then + echo "✓ Database '$DB_NAME' exists" +else + echo "Creating database '$DB_NAME'..." + psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d postgres -c "CREATE DATABASE $DB_NAME;" + echo "✓ Database created" +fi + +# Deploy schema +echo "" +echo "📋 Deploying database schema..." +psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -f database/schema.sql +echo "✓ Schema deployed" + +echo "" +echo "⚙️ Deploying database functions..." +psql -U "$DB_USER" -h "$DB_HOST" -p "$DB_PORT" -d "$DB_NAME" -f database/functions.sql +echo "✓ Functions deployed" + +echo "" +echo "✅ Setup complete!" +echo "" +echo "Next steps:" +echo " 1. Start the server: npm start" +echo " 2. Test the API: curl http://localhost:$API_PORT/health" +echo " 3. Follow the guide: examples/GETTING_STARTED.md" +echo ""