From f2c4eb8339892f84853702cfad356a9aff1f681b Mon Sep 17 00:00:00 2001 From: Paul Trowbridge Date: Sun, 5 Apr 2026 16:22:40 -0400 Subject: [PATCH] Add Python management tool for configure/deploy/manage workflow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit No external dependencies — uses psql CLI via subprocess. Interactive menu detects current state (DB connection, schema, UI build, service status) and guides through configure, deploy, rebuild, nginx setup, and service management. Handles both new installs and existing databases (grants access vs. creating). Co-Authored-By: Claude Sonnet 4.6 --- manage.py | 546 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 546 insertions(+) create mode 100755 manage.py diff --git a/manage.py b/manage.py new file mode 100755 index 0000000..baedbcb --- /dev/null +++ b/manage.py @@ -0,0 +1,546 @@ +#!/usr/bin/env python3 +""" +Dataflow Manager +Interactive tool for configuring, deploying, and managing the dataflow service. +""" + +import os +import sys +import subprocess +import getpass +import shutil +from pathlib import Path +from datetime import datetime + +ROOT = Path(__file__).parent.resolve() +ENV_FILE = ROOT / '.env' +SERVICE_FILE = Path('/etc/systemd/system/dataflow.service') +SERVICE_SRC = ROOT / 'dataflow.service' +NGINX_DIR = Path('/etc/nginx/sites-enabled') + +# ── Terminal helpers ────────────────────────────────────────────────────────── + +BOLD = '\033[1m' +DIM = '\033[2m' +GREEN = '\033[0;32m' +YELLOW = '\033[0;33m' +RED = '\033[0;31m' +CYAN = '\033[0;36m' +RESET = '\033[0m' + +def bold(s): return f'{BOLD}{s}{RESET}' +def dim(s): return f'{DIM}{s}{RESET}' +def green(s): return f'{GREEN}{s}{RESET}' +def yellow(s): return f'{YELLOW}{s}{RESET}' +def red(s): return f'{RED}{s}{RESET}' +def cyan(s): return f'{CYAN}{s}{RESET}' + +def header(title): + print(f'\n{BOLD}── {title} ──{RESET}') + +def ok(msg=''): print(f' {green("✓")} {msg}' if msg else f' {green("✓")}') +def warn(msg): print(f' {yellow("!")} {msg}') +def err(msg): print(f' {red("✗")} {msg}') +def info(msg): print(f' {dim(msg)}') + +def prompt(label, default=None, secret=False): + suffix = f' [{default}]' if default else '' + text = f' {label}{suffix}: ' + if secret: + val = getpass.getpass(text) + else: + val = input(text).strip() + return val if val else (default or '') + +def confirm(label, default_yes=True): + hint = '[Y/n]' if default_yes else '[y/N]' + val = input(f' {label} {hint}: ').strip().lower() + if not val: + return default_yes + return val.startswith('y') + +def pause(): + input(f'\n {dim("Press Enter to continue...")}') + +# ── Env file ────────────────────────────────────────────────────────────────── + +def load_env(): + env = {} + if ENV_FILE.exists(): + for line in ENV_FILE.read_text().splitlines(): + line = line.strip() + if line and not line.startswith('#') and '=' in line: + k, _, v = line.partition('=') + env[k.strip()] = v.strip() + return env + +def write_env(cfg): + content = f"""# Database Configuration +DB_HOST={cfg['DB_HOST']} +DB_PORT={cfg['DB_PORT']} +DB_NAME={cfg['DB_NAME']} +DB_USER={cfg['DB_USER']} +DB_PASSWORD={cfg['DB_PASSWORD']} + +# API Configuration +API_PORT={cfg.get('API_PORT', '3020')} +NODE_ENV={cfg.get('NODE_ENV', 'production')} +""" + ENV_FILE.write_text(content) + +# ── Database helpers ────────────────────────────────────────────────────────── + +def psql_env(cfg): + e = os.environ.copy() + e['PGPASSWORD'] = cfg['DB_PASSWORD'] + return e + +def psql_run(cfg, sql, db=None, check=False): + db = db or cfg['DB_NAME'] + cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], + '-p', str(cfg['DB_PORT']), '-d', db, '-tAc', sql] + return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg)) + +def psql_admin(admin_cfg, sql, db='postgres'): + e = os.environ.copy() + e['PGPASSWORD'] = admin_cfg['password'] + cmd = ['psql', '-U', admin_cfg['user'], '-h', admin_cfg['host'], + '-p', str(admin_cfg['port']), '-d', db, '-c', sql] + return subprocess.run(cmd, capture_output=True, text=True, env=e) + +def psql_file(cfg, filepath, db=None): + db = db or cfg['DB_NAME'] + cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], + '-p', str(cfg['DB_PORT']), '-d', db, '-f', str(filepath), '-q'] + return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg)) + +def can_connect(cfg): + r = psql_run(cfg, 'SELECT 1', db=cfg['DB_NAME']) + return r.returncode == 0 + +def schema_deployed(cfg): + r = psql_run(cfg, "SELECT 1 FROM information_schema.schemata WHERE schema_name='dataflow'") + return r.returncode == 0 and '1' in r.stdout + +def functions_deployed(cfg): + r = psql_run(cfg, "SELECT 1 FROM pg_proc WHERE proname='apply_transformations'") + return r.returncode == 0 and '1' in r.stdout + +# ── System helpers ──────────────────────────────────────────────────────────── + +def service_installed(): + return SERVICE_FILE.exists() + +def service_running(): + r = subprocess.run(['systemctl', 'is-active', 'dataflow'], + capture_output=True, text=True) + return r.stdout.strip() == 'active' + +def ui_built(): + index = ROOT / 'public' / 'index.html' + return index.exists() + +def ui_build_time(): + index = ROOT / 'public' / 'index.html' + if index.exists(): + ts = index.stat().st_mtime + return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M') + return None + +def nginx_domain(port): + """Find nginx site proxying to our port.""" + if not NGINX_DIR.exists(): + return None + for f in NGINX_DIR.iterdir(): + try: + text = f.read_text() + if f':{port}' in text: + for line in text.splitlines(): + if 'server_name' in line: + parts = line.split() + if len(parts) >= 2: + return parts[1].rstrip(';') + except Exception: + pass + return None + +def sudo_run(args, **kwargs): + return subprocess.run(['sudo'] + args, **kwargs) + +# ── Status ──────────────────────────────────────────────────────────────────── + +def show_status(cfg): + header('Status') + + if not cfg: + warn('No .env — not configured') + return + + port = cfg.get('API_PORT', '3020') + + # Database + connected = can_connect(cfg) + db_label = f"{cfg['DB_USER']}@{cfg['DB_HOST']}:{cfg['DB_PORT']}/{cfg['DB_NAME']}" + status = green('connected') if connected else red('cannot connect') + print(f' Database {dim(db_label)} {status}') + + if connected: + sd = schema_deployed(cfg) + fn = functions_deployed(cfg) + print(f' Schema {green("deployed") if sd else red("not deployed")}') + print(f' Functions {green("deployed") if fn else red("not deployed")}') + else: + print(f' Schema {dim("unknown")}') + print(f' Functions {dim("unknown")}') + + # UI + if ui_built(): + print(f' UI {green("built")} {dim(ui_build_time())}') + else: + print(f' UI {red("not built")}') + + # Service + if service_installed(): + running = service_running() + print(f' Service {green("running") if running else yellow("stopped")}') + else: + print(f' Service {dim("not installed")}') + + # Nginx + domain = nginx_domain(port) + if domain: + print(f' Nginx {green(domain)}') + else: + print(f' Nginx {dim("not configured")}') + + print() + +# ── Actions ─────────────────────────────────────────────────────────────────── + +def action_configure(cfg): + """Set up or reconfigure .env — handles new and existing databases.""" + header('Configure Database') + + existing = cfg.copy() if cfg else {} + + print(' Configure connection to an existing database, or create a new one.') + print() + + new_cfg = {} + new_cfg['DB_HOST'] = prompt('Host', existing.get('DB_HOST', 'localhost')) + new_cfg['DB_PORT'] = prompt('Port', existing.get('DB_PORT', '5432')) + new_cfg['DB_NAME'] = prompt('Database', existing.get('DB_NAME', 'dataflow')) + new_cfg['DB_USER'] = prompt('User', existing.get('DB_USER', 'dataflow')) + new_cfg['DB_PASSWORD'] = prompt('Password', existing.get('DB_PASSWORD', ''), secret=True) + new_cfg['API_PORT'] = prompt('API port', existing.get('API_PORT', '3020')) + new_cfg['NODE_ENV'] = prompt('Environment', existing.get('NODE_ENV', 'production')) + + print() + print(f' Testing connection to {new_cfg["DB_NAME"]}...') + if can_connect(new_cfg): + ok('Connected') + else: + warn('Cannot connect with those credentials.') + if not confirm('Create user/database using admin credentials?', default_yes=False): + info('Cancelled — .env not written') + return cfg + + print() + admin = {} + admin['user'] = prompt('Admin username', 'postgres') + admin['password'] = prompt('Admin password', secret=True) + admin['host'] = new_cfg['DB_HOST'] + admin['port'] = new_cfg['DB_PORT'] + + # Test admin connection + r = psql_admin(admin, 'SELECT 1') + if r.returncode != 0: + err(f'Cannot connect as {admin["user"]}') + return cfg + + # Create user + r = psql_admin(admin, f"SELECT 1 FROM pg_roles WHERE rolname='{new_cfg['DB_USER']}'") + if '1' in r.stdout: + info(f'User {new_cfg["DB_USER"]} already exists') + else: + r = psql_admin(admin, f"CREATE USER {new_cfg['DB_USER']} WITH PASSWORD '{new_cfg['DB_PASSWORD']}'") + if r.returncode == 0: + ok(f'User {new_cfg["DB_USER"]} created') + else: + err(f'Could not create user: {r.stderr.strip()}') + return cfg + + # Create or grant access to database + r = psql_admin(admin, f"SELECT 1 FROM pg_database WHERE datname='{new_cfg['DB_NAME']}'") + if '1' in r.stdout: + info(f'Database {new_cfg["DB_NAME"]} already exists — granting access') + psql_admin(admin, f"GRANT CREATE ON DATABASE {new_cfg['DB_NAME']} TO {new_cfg['DB_USER']}", db=new_cfg['DB_NAME']) + else: + r = psql_admin(admin, f"CREATE DATABASE {new_cfg['DB_NAME']} OWNER {new_cfg['DB_USER']}") + if r.returncode == 0: + ok(f'Database {new_cfg["DB_NAME"]} created') + else: + err(f'Could not create database: {r.stderr.strip()}') + return cfg + + if not can_connect(new_cfg): + err('Still cannot connect after setup — check credentials') + return cfg + ok('Connection verified') + + write_env(new_cfg) + ok('.env written') + return new_cfg + + +def action_deploy_schema(cfg): + header('Deploy Schema') + if not cfg: + err('No .env — run Configure first') + return + if not can_connect(cfg): + err('Cannot connect to database') + return + + if schema_deployed(cfg): + warn('Schema already deployed') + if not confirm('Redeploy (this will reset all data)?', default_yes=False): + info('Cancelled') + return + + print(f' Deploying schema to {cfg["DB_NAME"]}...') + r = psql_file(cfg, ROOT / 'database' / 'schema.sql') + if r.returncode == 0: + ok('Schema deployed') + else: + err(f'Failed:\n{r.stderr}') + + +def action_deploy_functions(cfg): + header('Deploy Functions') + if not cfg: + err('No .env — run Configure first') + return + if not can_connect(cfg): + err('Cannot connect to database') + return + + print(f' Deploying functions to {cfg["DB_NAME"]}...') + r = psql_file(cfg, ROOT / 'database' / 'functions.sql') + if r.returncode == 0: + ok('Functions deployed') + else: + err(f'Failed:\n{r.stderr}') + + +def action_build_ui(): + header('Build UI') + ui_dir = ROOT / 'ui' + if not (ui_dir / 'package.json').exists(): + err('ui/package.json not found') + return + + print(' Building...') + r = subprocess.run(['npm', 'run', 'build'], cwd=ui_dir, + capture_output=True, text=True) + if r.returncode == 0: + ok('UI built') + else: + err(f'Build failed:\n{r.stderr}') + + +def action_setup_nginx(cfg): + header('Nginx') + if not shutil.which('nginx'): + err('nginx not found') + return + + port = cfg.get('API_PORT', '3020') if cfg else '3020' + domain = prompt('Domain (e.g. dataflow.example.com)') + if not domain: + info('No domain — skipped') + return + + conf_name = domain.split('.')[0] + conf_path = NGINX_DIR / conf_name + cert_path = Path(f'/etc/letsencrypt/live/{domain}/fullchain.pem') + + if cert_path.exists(): + conf = f"""server {{ + listen 80; + listen [::]:80; + server_name {domain}; + location / {{ return 301 https://$host$request_uri; }} +}} + +server {{ + listen 443 ssl http2; + listen [::]:443 ssl http2; + server_name {domain}; + + ssl_certificate {cert_path}; + ssl_certificate_key /etc/letsencrypt/live/{domain}/privkey.pem; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers HIGH:!MEDIUM:!LOW:!aNULL:!NULL:!SHA; + ssl_prefer_server_ciphers on; + ssl_session_cache shared:SSL:10m; + + keepalive_timeout 70; + sendfile on; + client_max_body_size 80m; + + location / {{ + proxy_pass http://localhost:{port}; + }} +}} +""" + else: + conf = f"""server {{ + listen 80; + listen [::]:80; + server_name {domain}; + location / {{ + proxy_pass http://localhost:{port}; + }} +}} +""" + + # Write via sudo + import tempfile + with tempfile.NamedTemporaryFile('w', suffix='.conf', delete=False) as f: + f.write(conf) + tmp = f.name + + r = sudo_run(['cp', tmp, str(conf_path)]) + os.unlink(tmp) + if r.returncode != 0: + err('Could not write nginx config (check sudo)') + return + ok(f'Config written to {conf_path}') + + r = sudo_run(['nginx', '-t'], capture_output=True) + if r.returncode != 0: + err('nginx config invalid — run: sudo nginx -t') + return + ok('Config valid') + + sudo_run(['systemctl', 'reload', 'nginx']) + ok('nginx reloaded') + + if not cert_path.exists(): + warn(f'No SSL cert for {domain}') + if confirm('Run certbot now?'): + r = sudo_run(['certbot', '--nginx', '-d', domain, + '--non-interactive', '--agree-tos', '--redirect', + '-m', f'admin@{domain}']) + if r.returncode == 0: + ok('SSL configured') + else: + err(f'certbot failed — run manually: sudo certbot --nginx -d {domain}') + + +def action_install_service(): + header('Systemd Service') + if service_installed(): + info('Already installed') + return + if not SERVICE_SRC.exists(): + err(f'{SERVICE_SRC} not found') + return + + r = sudo_run(['cp', str(SERVICE_SRC), str(SERVICE_FILE)]) + if r.returncode != 0: + err('Could not install service (check sudo)') + return + ok('Service file installed') + + sudo_run(['systemctl', 'daemon-reload']) + sudo_run(['systemctl', 'enable', 'dataflow'], capture_output=True) + ok('Enabled on boot') + + +def action_restart_service(): + header('API Server') + if not service_installed(): + err('Service not installed — run Install Service first') + return + + action = 'restart' if service_running() else 'start' + r = sudo_run(['systemctl', action, 'dataflow']) + if r.returncode != 0: + err(f'Failed — check: journalctl -u dataflow -n 30') + return + + import time; time.sleep(1) + if service_running(): + ok(f'Service {action}ed and running') + else: + err('Service failed to start — check: journalctl -u dataflow -n 30') + + +def action_stop_service(): + header('Stop Service') + if not service_running(): + info('Service is not running') + return + sudo_run(['systemctl', 'stop', 'dataflow']) + ok('Service stopped') + + +# ── Main menu ───────────────────────────────────────────────────────────────── + +MENU = [ + ('Configure database', action_configure), + ('Deploy schema', action_deploy_schema), + ('Deploy functions', action_deploy_functions), + ('Build UI', action_build_ui), + ('Set up nginx', action_setup_nginx), + ('Install service', action_install_service), + ('Start / restart service', action_restart_service), + ('Stop service', action_stop_service), +] + +def main(): + while True: + os.system('clear') + print(bold('Dataflow Manager')) + print('=' * 40) + + cfg = load_env() + show_status(cfg) + + print(bold('Actions')) + for i, (label, _) in enumerate(MENU, 1): + print(f' {cyan(str(i))}. {label}') + print(f' {cyan("q")}. Quit') + print() + + choice = input(' Choice: ').strip().lower() + + if choice in ('q', 'quit', 'exit'): + print() + break + + try: + idx = int(choice) - 1 + if 0 <= idx < len(MENU): + label, fn = MENU[idx] + # Some actions need cfg, some don't + import inspect + sig = inspect.signature(fn) + if len(sig.parameters) == 0: + result = fn() + elif len(sig.parameters) == 1: + result = fn(cfg) + # configure returns updated cfg + if label.startswith('Configure') and result is not None: + cfg = result + pause() + else: + warn('Invalid choice') + except (ValueError, IndexError): + warn('Invalid choice') + + +if __name__ == '__main__': + main()