diff --git a/manage.py b/manage.py new file mode 100755 index 0000000..baedbcb --- /dev/null +++ b/manage.py @@ -0,0 +1,546 @@ +#!/usr/bin/env python3 +""" +Dataflow Manager +Interactive tool for configuring, deploying, and managing the dataflow service. +""" + +import os +import sys +import subprocess +import getpass +import shutil +from pathlib import Path +from datetime import datetime + +ROOT = Path(__file__).parent.resolve() +ENV_FILE = ROOT / '.env' +SERVICE_FILE = Path('/etc/systemd/system/dataflow.service') +SERVICE_SRC = ROOT / 'dataflow.service' +NGINX_DIR = Path('/etc/nginx/sites-enabled') + +# ── Terminal helpers ────────────────────────────────────────────────────────── + +BOLD = '\033[1m' +DIM = '\033[2m' +GREEN = '\033[0;32m' +YELLOW = '\033[0;33m' +RED = '\033[0;31m' +CYAN = '\033[0;36m' +RESET = '\033[0m' + +def bold(s): return f'{BOLD}{s}{RESET}' +def dim(s): return f'{DIM}{s}{RESET}' +def green(s): return f'{GREEN}{s}{RESET}' +def yellow(s): return f'{YELLOW}{s}{RESET}' +def red(s): return f'{RED}{s}{RESET}' +def cyan(s): return f'{CYAN}{s}{RESET}' + +def header(title): + print(f'\n{BOLD}── {title} ──{RESET}') + +def ok(msg=''): print(f' {green("✓")} {msg}' if msg else f' {green("✓")}') +def warn(msg): print(f' {yellow("!")} {msg}') +def err(msg): print(f' {red("✗")} {msg}') +def info(msg): print(f' {dim(msg)}') + +def prompt(label, default=None, secret=False): + suffix = f' [{default}]' if default else '' + text = f' {label}{suffix}: ' + if secret: + val = getpass.getpass(text) + else: + val = input(text).strip() + return val if val else (default or '') + +def confirm(label, default_yes=True): + hint = '[Y/n]' if default_yes else '[y/N]' + val = input(f' {label} {hint}: ').strip().lower() + if not val: + return default_yes + return val.startswith('y') + +def pause(): + input(f'\n {dim("Press Enter to continue...")}') + +# ── Env file ────────────────────────────────────────────────────────────────── + +def load_env(): + env = {} + if ENV_FILE.exists(): + for line in ENV_FILE.read_text().splitlines(): + line = line.strip() + if line and not line.startswith('#') and '=' in line: + k, _, v = line.partition('=') + env[k.strip()] = v.strip() + return env + +def write_env(cfg): + content = f"""# Database Configuration +DB_HOST={cfg['DB_HOST']} +DB_PORT={cfg['DB_PORT']} +DB_NAME={cfg['DB_NAME']} +DB_USER={cfg['DB_USER']} +DB_PASSWORD={cfg['DB_PASSWORD']} + +# API Configuration +API_PORT={cfg.get('API_PORT', '3020')} +NODE_ENV={cfg.get('NODE_ENV', 'production')} +""" + ENV_FILE.write_text(content) + +# ── Database helpers ────────────────────────────────────────────────────────── + +def psql_env(cfg): + e = os.environ.copy() + e['PGPASSWORD'] = cfg['DB_PASSWORD'] + return e + +def psql_run(cfg, sql, db=None, check=False): + db = db or cfg['DB_NAME'] + cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], + '-p', str(cfg['DB_PORT']), '-d', db, '-tAc', sql] + return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg)) + +def psql_admin(admin_cfg, sql, db='postgres'): + e = os.environ.copy() + e['PGPASSWORD'] = admin_cfg['password'] + cmd = ['psql', '-U', admin_cfg['user'], '-h', admin_cfg['host'], + '-p', str(admin_cfg['port']), '-d', db, '-c', sql] + return subprocess.run(cmd, capture_output=True, text=True, env=e) + +def psql_file(cfg, filepath, db=None): + db = db or cfg['DB_NAME'] + cmd = ['psql', '-U', cfg['DB_USER'], '-h', cfg['DB_HOST'], + '-p', str(cfg['DB_PORT']), '-d', db, '-f', str(filepath), '-q'] + return subprocess.run(cmd, capture_output=True, text=True, env=psql_env(cfg)) + +def can_connect(cfg): + r = psql_run(cfg, 'SELECT 1', db=cfg['DB_NAME']) + return r.returncode == 0 + +def schema_deployed(cfg): + r = psql_run(cfg, "SELECT 1 FROM information_schema.schemata WHERE schema_name='dataflow'") + return r.returncode == 0 and '1' in r.stdout + +def functions_deployed(cfg): + r = psql_run(cfg, "SELECT 1 FROM pg_proc WHERE proname='apply_transformations'") + return r.returncode == 0 and '1' in r.stdout + +# ── System helpers ──────────────────────────────────────────────────────────── + +def service_installed(): + return SERVICE_FILE.exists() + +def service_running(): + r = subprocess.run(['systemctl', 'is-active', 'dataflow'], + capture_output=True, text=True) + return r.stdout.strip() == 'active' + +def ui_built(): + index = ROOT / 'public' / 'index.html' + return index.exists() + +def ui_build_time(): + index = ROOT / 'public' / 'index.html' + if index.exists(): + ts = index.stat().st_mtime + return datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M') + return None + +def nginx_domain(port): + """Find nginx site proxying to our port.""" + if not NGINX_DIR.exists(): + return None + for f in NGINX_DIR.iterdir(): + try: + text = f.read_text() + if f':{port}' in text: + for line in text.splitlines(): + if 'server_name' in line: + parts = line.split() + if len(parts) >= 2: + return parts[1].rstrip(';') + except Exception: + pass + return None + +def sudo_run(args, **kwargs): + return subprocess.run(['sudo'] + args, **kwargs) + +# ── Status ──────────────────────────────────────────────────────────────────── + +def show_status(cfg): + header('Status') + + if not cfg: + warn('No .env — not configured') + return + + port = cfg.get('API_PORT', '3020') + + # Database + connected = can_connect(cfg) + db_label = f"{cfg['DB_USER']}@{cfg['DB_HOST']}:{cfg['DB_PORT']}/{cfg['DB_NAME']}" + status = green('connected') if connected else red('cannot connect') + print(f' Database {dim(db_label)} {status}') + + if connected: + sd = schema_deployed(cfg) + fn = functions_deployed(cfg) + print(f' Schema {green("deployed") if sd else red("not deployed")}') + print(f' Functions {green("deployed") if fn else red("not deployed")}') + else: + print(f' Schema {dim("unknown")}') + print(f' Functions {dim("unknown")}') + + # UI + if ui_built(): + print(f' UI {green("built")} {dim(ui_build_time())}') + else: + print(f' UI {red("not built")}') + + # Service + if service_installed(): + running = service_running() + print(f' Service {green("running") if running else yellow("stopped")}') + else: + print(f' Service {dim("not installed")}') + + # Nginx + domain = nginx_domain(port) + if domain: + print(f' Nginx {green(domain)}') + else: + print(f' Nginx {dim("not configured")}') + + print() + +# ── Actions ─────────────────────────────────────────────────────────────────── + +def action_configure(cfg): + """Set up or reconfigure .env — handles new and existing databases.""" + header('Configure Database') + + existing = cfg.copy() if cfg else {} + + print(' Configure connection to an existing database, or create a new one.') + print() + + new_cfg = {} + new_cfg['DB_HOST'] = prompt('Host', existing.get('DB_HOST', 'localhost')) + new_cfg['DB_PORT'] = prompt('Port', existing.get('DB_PORT', '5432')) + new_cfg['DB_NAME'] = prompt('Database', existing.get('DB_NAME', 'dataflow')) + new_cfg['DB_USER'] = prompt('User', existing.get('DB_USER', 'dataflow')) + new_cfg['DB_PASSWORD'] = prompt('Password', existing.get('DB_PASSWORD', ''), secret=True) + new_cfg['API_PORT'] = prompt('API port', existing.get('API_PORT', '3020')) + new_cfg['NODE_ENV'] = prompt('Environment', existing.get('NODE_ENV', 'production')) + + print() + print(f' Testing connection to {new_cfg["DB_NAME"]}...') + if can_connect(new_cfg): + ok('Connected') + else: + warn('Cannot connect with those credentials.') + if not confirm('Create user/database using admin credentials?', default_yes=False): + info('Cancelled — .env not written') + return cfg + + print() + admin = {} + admin['user'] = prompt('Admin username', 'postgres') + admin['password'] = prompt('Admin password', secret=True) + admin['host'] = new_cfg['DB_HOST'] + admin['port'] = new_cfg['DB_PORT'] + + # Test admin connection + r = psql_admin(admin, 'SELECT 1') + if r.returncode != 0: + err(f'Cannot connect as {admin["user"]}') + return cfg + + # Create user + r = psql_admin(admin, f"SELECT 1 FROM pg_roles WHERE rolname='{new_cfg['DB_USER']}'") + if '1' in r.stdout: + info(f'User {new_cfg["DB_USER"]} already exists') + else: + r = psql_admin(admin, f"CREATE USER {new_cfg['DB_USER']} WITH PASSWORD '{new_cfg['DB_PASSWORD']}'") + if r.returncode == 0: + ok(f'User {new_cfg["DB_USER"]} created') + else: + err(f'Could not create user: {r.stderr.strip()}') + return cfg + + # Create or grant access to database + r = psql_admin(admin, f"SELECT 1 FROM pg_database WHERE datname='{new_cfg['DB_NAME']}'") + if '1' in r.stdout: + info(f'Database {new_cfg["DB_NAME"]} already exists — granting access') + psql_admin(admin, f"GRANT CREATE ON DATABASE {new_cfg['DB_NAME']} TO {new_cfg['DB_USER']}", db=new_cfg['DB_NAME']) + else: + r = psql_admin(admin, f"CREATE DATABASE {new_cfg['DB_NAME']} OWNER {new_cfg['DB_USER']}") + if r.returncode == 0: + ok(f'Database {new_cfg["DB_NAME"]} created') + else: + err(f'Could not create database: {r.stderr.strip()}') + return cfg + + if not can_connect(new_cfg): + err('Still cannot connect after setup — check credentials') + return cfg + ok('Connection verified') + + write_env(new_cfg) + ok('.env written') + return new_cfg + + +def action_deploy_schema(cfg): + header('Deploy Schema') + if not cfg: + err('No .env — run Configure first') + return + if not can_connect(cfg): + err('Cannot connect to database') + return + + if schema_deployed(cfg): + warn('Schema already deployed') + if not confirm('Redeploy (this will reset all data)?', default_yes=False): + info('Cancelled') + return + + print(f' Deploying schema to {cfg["DB_NAME"]}...') + r = psql_file(cfg, ROOT / 'database' / 'schema.sql') + if r.returncode == 0: + ok('Schema deployed') + else: + err(f'Failed:\n{r.stderr}') + + +def action_deploy_functions(cfg): + header('Deploy Functions') + if not cfg: + err('No .env — run Configure first') + return + if not can_connect(cfg): + err('Cannot connect to database') + return + + print(f' Deploying functions to {cfg["DB_NAME"]}...') + r = psql_file(cfg, ROOT / 'database' / 'functions.sql') + if r.returncode == 0: + ok('Functions deployed') + else: + err(f'Failed:\n{r.stderr}') + + +def action_build_ui(): + header('Build UI') + ui_dir = ROOT / 'ui' + if not (ui_dir / 'package.json').exists(): + err('ui/package.json not found') + return + + print(' Building...') + r = subprocess.run(['npm', 'run', 'build'], cwd=ui_dir, + capture_output=True, text=True) + if r.returncode == 0: + ok('UI built') + else: + err(f'Build failed:\n{r.stderr}') + + +def action_setup_nginx(cfg): + header('Nginx') + if not shutil.which('nginx'): + err('nginx not found') + return + + port = cfg.get('API_PORT', '3020') if cfg else '3020' + domain = prompt('Domain (e.g. dataflow.example.com)') + if not domain: + info('No domain — skipped') + return + + conf_name = domain.split('.')[0] + conf_path = NGINX_DIR / conf_name + cert_path = Path(f'/etc/letsencrypt/live/{domain}/fullchain.pem') + + if cert_path.exists(): + conf = f"""server {{ + listen 80; + listen [::]:80; + server_name {domain}; + location / {{ return 301 https://$host$request_uri; }} +}} + +server {{ + listen 443 ssl http2; + listen [::]:443 ssl http2; + server_name {domain}; + + ssl_certificate {cert_path}; + ssl_certificate_key /etc/letsencrypt/live/{domain}/privkey.pem; + ssl_protocols TLSv1.2 TLSv1.3; + ssl_ciphers HIGH:!MEDIUM:!LOW:!aNULL:!NULL:!SHA; + ssl_prefer_server_ciphers on; + ssl_session_cache shared:SSL:10m; + + keepalive_timeout 70; + sendfile on; + client_max_body_size 80m; + + location / {{ + proxy_pass http://localhost:{port}; + }} +}} +""" + else: + conf = f"""server {{ + listen 80; + listen [::]:80; + server_name {domain}; + location / {{ + proxy_pass http://localhost:{port}; + }} +}} +""" + + # Write via sudo + import tempfile + with tempfile.NamedTemporaryFile('w', suffix='.conf', delete=False) as f: + f.write(conf) + tmp = f.name + + r = sudo_run(['cp', tmp, str(conf_path)]) + os.unlink(tmp) + if r.returncode != 0: + err('Could not write nginx config (check sudo)') + return + ok(f'Config written to {conf_path}') + + r = sudo_run(['nginx', '-t'], capture_output=True) + if r.returncode != 0: + err('nginx config invalid — run: sudo nginx -t') + return + ok('Config valid') + + sudo_run(['systemctl', 'reload', 'nginx']) + ok('nginx reloaded') + + if not cert_path.exists(): + warn(f'No SSL cert for {domain}') + if confirm('Run certbot now?'): + r = sudo_run(['certbot', '--nginx', '-d', domain, + '--non-interactive', '--agree-tos', '--redirect', + '-m', f'admin@{domain}']) + if r.returncode == 0: + ok('SSL configured') + else: + err(f'certbot failed — run manually: sudo certbot --nginx -d {domain}') + + +def action_install_service(): + header('Systemd Service') + if service_installed(): + info('Already installed') + return + if not SERVICE_SRC.exists(): + err(f'{SERVICE_SRC} not found') + return + + r = sudo_run(['cp', str(SERVICE_SRC), str(SERVICE_FILE)]) + if r.returncode != 0: + err('Could not install service (check sudo)') + return + ok('Service file installed') + + sudo_run(['systemctl', 'daemon-reload']) + sudo_run(['systemctl', 'enable', 'dataflow'], capture_output=True) + ok('Enabled on boot') + + +def action_restart_service(): + header('API Server') + if not service_installed(): + err('Service not installed — run Install Service first') + return + + action = 'restart' if service_running() else 'start' + r = sudo_run(['systemctl', action, 'dataflow']) + if r.returncode != 0: + err(f'Failed — check: journalctl -u dataflow -n 30') + return + + import time; time.sleep(1) + if service_running(): + ok(f'Service {action}ed and running') + else: + err('Service failed to start — check: journalctl -u dataflow -n 30') + + +def action_stop_service(): + header('Stop Service') + if not service_running(): + info('Service is not running') + return + sudo_run(['systemctl', 'stop', 'dataflow']) + ok('Service stopped') + + +# ── Main menu ───────────────────────────────────────────────────────────────── + +MENU = [ + ('Configure database', action_configure), + ('Deploy schema', action_deploy_schema), + ('Deploy functions', action_deploy_functions), + ('Build UI', action_build_ui), + ('Set up nginx', action_setup_nginx), + ('Install service', action_install_service), + ('Start / restart service', action_restart_service), + ('Stop service', action_stop_service), +] + +def main(): + while True: + os.system('clear') + print(bold('Dataflow Manager')) + print('=' * 40) + + cfg = load_env() + show_status(cfg) + + print(bold('Actions')) + for i, (label, _) in enumerate(MENU, 1): + print(f' {cyan(str(i))}. {label}') + print(f' {cyan("q")}. Quit') + print() + + choice = input(' Choice: ').strip().lower() + + if choice in ('q', 'quit', 'exit'): + print() + break + + try: + idx = int(choice) - 1 + if 0 <= idx < len(MENU): + label, fn = MENU[idx] + # Some actions need cfg, some don't + import inspect + sig = inspect.signature(fn) + if len(sig.parameters) == 0: + result = fn() + elif len(sig.parameters) == 1: + result = fn(cfg) + # configure returns updated cfg + if label.startswith('Configure') and result is not None: + cfg = result + pause() + else: + warn('Invalid choice') + except (ValueError, IndexError): + warn('Invalid choice') + + +if __name__ == '__main__': + main()