#!/usr/bin/env python3 """Pipekit TUI — manage sync modules, connections, and runs.""" import os import sys from textual import work from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal, Vertical, VerticalScroll from textual.screen import ModalScreen from textual.widgets import ( Button, DataTable, Footer, Header, Input, Label, RadioButton, RadioSet, RichLog, Select, Static, TextArea, Tree, ) import re sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from tui.client import PipekitClient def _parse_column_aliases(source_query: str) -> list[str]: """Extract column aliases from a SELECT query (the AS names).""" aliases = [] for m in re.finditer(r'\bAS\s+(\w+)', source_query, re.IGNORECASE): aliases.append(m.group(1)) return aliases def _parse_select_columns(source_query: str) -> list[dict]: """Parse SELECT columns into structured info: source name, alias, trimmed.""" columns = [] # Match lines like: RTRIM(COL) AS alias or COL AS alias for m in re.finditer( r'(?:RTRIM\(([^)]+)\)|(\[?["\w#@$]+\]?(?:\.["\w#@$]+)*))\s+AS\s+(\w+)', source_query, re.IGNORECASE ): rtrim_col = m.group(1) plain_col = m.group(2) alias = m.group(3) if rtrim_col: columns.append({"source": rtrim_col.strip(), "alias": alias, "trimmed": "yes"}) else: columns.append({"source": plain_col.strip(), "alias": alias, "trimmed": ""}) return columns # --------------------------------------------------------------------------- # Connection manager screen # --------------------------------------------------------------------------- class ConnectionScreen(ModalScreen[None]): BINDINGS = [Binding("escape", "dismiss", "Close", priority=True)] DEFAULT_CSS = """ ConnectionScreen { align: center middle; } #conn-box { width: 85%; height: 85%; border: thick $primary; background: $panel; padding: 1 2; } #conn-table { height: 1fr; } #conn-form Input { margin-bottom: 1; } """ def __init__(self, client: PipekitClient) -> None: super().__init__() self.client = client def compose(self) -> ComposeResult: with Vertical(id="conn-box"): yield Label("[bold]Connections[/bold]") yield DataTable(id="conn-table", cursor_type="row") yield Label("Add Connection", classes="label") yield Input(placeholder="Name", id="conn-name") yield Input(placeholder="JDBC URL", id="conn-url") yield Input(placeholder="Username", id="conn-user") yield Input(placeholder="Password (or $ENV_VAR)", id="conn-pass") with Horizontal(): yield Button("Add", variant="primary", id="btn-add-conn") yield Button("Test Selected", id="btn-test-conn") yield Button("Delete Selected", variant="error", id="btn-del-conn") def on_mount(self) -> None: self._refresh() def _refresh(self) -> None: table = self.query_one("#conn-table", DataTable) table.clear(columns=True) table.add_columns("ID", "Name", "JDBC URL", "Username", "Deletes") for c in self.client.list_connections(): table.add_row( str(c["id"]), c["name"], c["jdbc_url"][:60], c["username"] or "", "yes" if c["supports_deletes"] else "no", ) def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "btn-add-conn": name = self.query_one("#conn-name", Input).value.strip() url = self.query_one("#conn-url", Input).value.strip() user = self.query_one("#conn-user", Input).value.strip() pw = self.query_one("#conn-pass", Input).value.strip() if not name or not url: self.notify("Name and JDBC URL required", severity="error") return self.client.create_connection({ "name": name, "jdbc_url": url, "username": user, "password": pw, }) self.notify(f"Created connection: {name}") self._refresh() elif event.button.id == "btn-test-conn": self._test_selected() elif event.button.id == "btn-del-conn": self._delete_selected() def _get_selected_id(self) -> int | None: table = self.query_one("#conn-table", DataTable) if table.cursor_row is not None and table.row_count > 0: row = table.get_row_at(table.cursor_row) return int(row[0]) return None @work(thread=True) def _test_selected(self) -> None: cid = self._get_selected_id() if not cid: return result = self.client.test_connection(cid) if result["status"] == "ok": self.app.call_from_thread(self.notify, "Connection OK!", severity="information") else: self.app.call_from_thread(self.notify, f"Failed: {result.get('detail','')}", severity="error") def _delete_selected(self) -> None: cid = self._get_selected_id() if cid: self.client.delete_connection(cid) self.notify("Deleted") self._refresh() # --------------------------------------------------------------------------- # New module wizard with table browsing # --------------------------------------------------------------------------- class NewModuleScreen(ModalScreen[None]): BINDINGS = [Binding("escape", "dismiss", "Close", priority=True)] DEFAULT_CSS = """ NewModuleScreen { align: center middle; } #wiz-box { width: 85%; height: 90%; border: thick $primary; background: $panel; padding: 1 2; overflow-y: auto; } #wiz-box Input, #wiz-box Select, #wiz-box TextArea { margin-bottom: 1; } .label { margin-top: 1; color: $text-muted; } #source-query { height: 12; } #wiz-buttons { height: 3; dock: bottom; } #table-list { height: 12; } #browse-status { height: 1; color: $text-muted; } #browse-fields { height: auto; } #browse-fields Input { width: 1fr; } #browse-fields Button { width: auto; min-width: 16; } """ def __init__(self, client: PipekitClient) -> None: super().__init__() self.client = client self._tables: list[dict] = [] def compose(self) -> ComposeResult: conns = self.client.list_connections() conn_opts = [(c["name"], c["id"]) for c in conns] with VerticalScroll(id="wiz-box"): yield Label("[bold]New Sync Module[/bold]") yield Label("Source Connection", classes="label") yield Select(conn_opts, id="wiz-source-conn", prompt="Select source...") yield Label("Browse Source Tables (Enter or Load to fetch)", classes="label") with Horizontal(id="browse-fields"): yield Input(placeholder="linked server", id="linked-server") yield Input(placeholder="database", id="db-filter") yield Input(placeholder="schema", id="schema-filter") yield Button("Load", id="btn-browse") yield Input(placeholder="filter loaded tables...", id="table-filter") dt = DataTable(id="table-list", cursor_type="row") dt.add_columns("Schema", "Table", "Type") yield dt yield Static("", id="browse-status") yield Label("Destination Connection", classes="label") yield Select(conn_opts, id="wiz-dest-conn", prompt="Select destination...") yield Label("Module Name", classes="label") yield Input(placeholder="module_name", id="wiz-name") yield Label("Destination Table (schema.table)", classes="label") yield Input(placeholder="schema.table", id="wiz-dest-table") yield Label("Source Query", classes="label") yield TextArea("", id="source-query", language="sql") yield Label("Merge Strategy", classes="label") with RadioSet(id="wiz-strategy"): yield RadioButton("Full Refresh", value=True, id="strat-full") yield RadioButton("Incremental", id="strat-incr") yield RadioButton("Append", id="strat-append") yield RadioButton("Upsert", id="strat-upsert") yield Label("Merge Key (for incremental/upsert)", classes="label") yield Input(placeholder="primary_key_column", id="wiz-merge-key") yield Label("Watermark Column (for incremental)", classes="label") yield Input(placeholder="e.g. dex_row_ts", id="wiz-ts-col") with Horizontal(id="wiz-buttons"): yield Button("Create", variant="primary", id="wiz-create") yield Button("Cancel", id="wiz-cancel") def on_input_submitted(self, event: Input.Submitted) -> None: if event.input.id in ("linked-server", "db-filter", "schema-filter"): self._load_tables() def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "wiz-cancel": self.dismiss() elif event.button.id == "btn-browse": self._load_tables() elif event.button.id == "wiz-create": self._create_module() @work(thread=True) def _load_tables(self) -> None: conn_select = self.query_one("#wiz-source-conn", Select) if conn_select.value is Select.BLANK or not isinstance(conn_select.value, int): self.app.call_from_thread(self.notify, "Select a source connection first", severity="error") return linked = self.query_one("#linked-server", Input).value.strip() or None db = self.query_one("#db-filter", Input).value.strip() or None schema = self.query_one("#schema-filter", Input).value.strip() or None # Build the schema_filter string for the API if linked and db: schema_filter = f"{linked}.{db}" if schema: schema_filter += f".{schema}" elif db: # Just a database name — query that database's INFORMATION_SCHEMA schema_filter = f".{db}" if schema: schema_filter += f".{schema}" else: schema_filter = schema self.app.call_from_thread( self.query_one("#browse-status", Static).update, "Loading tables..." ) try: tables = self.client.list_tables(conn_select.value, schema=schema_filter) self._tables = tables def _update(): self.query_one("#table-filter", Input).value = "" self._populate_table_list(tables) self.query_one("#browse-status", Static).update(f"{len(tables)} tables/views found") self.app.call_from_thread(_update) except Exception as e: self.app.call_from_thread(self.notify, f"Error: {e}", severity="error") def _populate_table_list(self, tables: list[dict]) -> None: dt = self.query_one("#table-list", DataTable) dt.clear() for i, t in enumerate(tables): dt.add_row(t["schema"], t["name"], t["type_label"], key=str(i)) def on_input_changed(self, event: Input.Changed) -> None: if event.input.id == "table-filter" and self._tables: q = event.value.lower() filtered = [t for t in self._tables if q in t["name"].lower() or q in t["schema"].lower()] self._populate_table_list(filtered) self.query_one("#browse-status", Static).update( f"{len(filtered)} of {len(self._tables)} shown" ) def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None: if event.data_table.id == "table-list": # The row key is the index into self._tables for unfiltered, # but when filtered we need to look up by schema+name from the row data row_data = event.data_table.get_row(event.row_key) schema, name = row_data[0], row_data[1] # Find the matching table in our full list for t in self._tables: if t["schema"] == schema and t["name"] == name: self._on_table_selected_by_ref(t) break @work(thread=True) def _on_table_selected_by_ref(self, table: dict) -> None: conn_id = self.query_one("#wiz-source-conn", Select).value self.app.call_from_thread( self.query_one("#browse-status", Static).update, f"Loading columns for {table['schema']}.{table['name']}..." ) try: proposal = self.client.propose_module( conn_id, table["schema"], table["name"], linked_server=table.get("linked_server"), linked_db=table.get("linked_db"), ) def _update(): self.query_one("#source-query", TextArea).load_text(proposal["source_query"]) name_input = self.query_one("#wiz-name", Input) if not name_input.value: name_input.value = proposal["name"] dest_input = self.query_one("#wiz-dest-table", Input) if not dest_input.value: dest_input.value = proposal["dest_table"] if proposal.get("merge_key"): mk = self.query_one("#wiz-merge-key", Input) if not mk.value: mk.value = proposal["merge_key"] if proposal.get("watermark_column"): tc = self.query_one("#wiz-ts-col", Input) if not tc.value: tc.value = proposal["watermark_column"] self.query_one("#browse-status", Static).update( f"{len(proposal['columns'])} columns — strategy: {proposal['merge_strategy']}" ) self.app.call_from_thread(_update) except Exception as e: self.app.call_from_thread(self.notify, f"Error: {e}", severity="error") def _create_module(self) -> None: src_conn = self.query_one("#wiz-source-conn", Select).value dst_conn = self.query_one("#wiz-dest-conn", Select).value if src_conn is Select.BLANK or dst_conn is Select.BLANK: self.notify("Select source and destination connections", severity="error") return name = self.query_one("#wiz-name", Input).value.strip() dest_table = self.query_one("#wiz-dest-table", Input).value.strip() query = self.query_one("#source-query", TextArea).text.strip() if not name or not dest_table or not query: self.notify("Name, destination table, and query required", severity="error") return strat_idx = self.query_one("#wiz-strategy", RadioSet).pressed_index strategy = ["full", "incremental", "append", "upsert"][strat_idx] data = { "name": name, "source_connection_id": src_conn, "dest_connection_id": dst_conn, "dest_table": dest_table, "source_query": query, "merge_strategy": strategy, "merge_key": self.query_one("#wiz-merge-key", Input).value.strip() or None, "watermark_column": self.query_one("#wiz-ts-col", Input).value.strip() or None, } try: self.client.create_module(data) self.notify(f"Created module: {name}") self.dismiss() except Exception as e: self.notify(f"Error: {e}", severity="error") # --------------------------------------------------------------------------- # Run history screen # --------------------------------------------------------------------------- class HistoryScreen(ModalScreen[None]): BINDINGS = [ Binding("escape", "dismiss", "Close", priority=True), Binding("v", "view_in_editor", "View in editor", priority=True), ] DEFAULT_CSS = """ HistoryScreen { align: center middle; } #hist-box { width: 90%; height: 90%; border: thick $primary; background: $panel; padding: 1 2; } #hist-table { height: 1fr; } #hist-source { height: 1fr; } #hist-merge { height: 1fr; } #hist-error { height: auto; max-height: 4; color: red; } .sql-label { margin-top: 1; color: $text-muted; } """ def __init__(self, client: PipekitClient, module_id: int, module_name: str) -> None: super().__init__() self.client = client self.module_id = module_id self.module_name = module_name self._runs: dict[str, dict] = {} def compose(self) -> ComposeResult: with Vertical(id="hist-box"): yield Label(f"[bold]Run History: {self.module_name}[/bold]") yield DataTable(id="hist-table", cursor_type="row") yield Label("Source Query", classes="sql-label") yield TextArea("", read_only=True, id="hist-source", language="sql") yield Label("Merge SQL", classes="sql-label") yield TextArea("", read_only=True, id="hist-merge", language="sql") yield Static("", id="hist-error") yield Footer() def on_mount(self) -> None: table = self.query_one("#hist-table", DataTable) table.add_columns("Run", "Status", "Rows", "Started", "Finished", "Error") try: history = self.client.module_history(self.module_id) for r in history: run_id = str(r.get("id", "")) self._runs[run_id] = r status = r.get("status", "") if status == "success": status_display = "[green]success[/green]" elif status == "error": status_display = "[red]error[/red]" else: status_display = f"[yellow]{status}[/yellow]" table.add_row( run_id, status_display, str(r.get("row_count", "") or "-"), r.get("started_at", "")[:19], (r.get("finished_at") or "")[:19], (r.get("error") or "")[:60], key=run_id, ) except Exception as e: self.notify(f"Error loading history: {e}", severity="error") def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: if event.row_key is None: return run = self._runs.get(str(event.row_key.value)) if not run: return self.query_one("#hist-source", TextArea).load_text(run.get("source_query") or "") self.query_one("#hist-merge", TextArea).load_text(run.get("merge_sql") or "") error = run.get("error") or "" self.query_one("#hist-error", Static).update(f"[red]{error}[/red]" if error else "") def _get_highlighted_run(self) -> dict | None: table = self.query_one("#hist-table", DataTable) if table.cursor_row is not None and table.row_count > 0: row = table.get_row_at(table.cursor_row) return self._runs.get(str(row[0])) return None def action_view_in_editor(self) -> None: import tempfile run = self._get_highlighted_run() if not run: return parts = [] if run.get("source_query"): parts.append("-- SOURCE QUERY:") parts.append(run["source_query"]) if run.get("merge_sql"): parts.append("\n-- MERGE SQL:") parts.append(run["merge_sql"]) if run.get("error"): parts.append(f"\n-- ERROR: {run['error']}") sql = "\n".join(parts) if not sql.strip(): return editor = os.environ.get("EDITOR", "nvim") with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(sql) tmp_path = f.name try: with self.app.suspend(): os.system(f'{editor} -R "{tmp_path}"') finally: os.unlink(tmp_path) # --------------------------------------------------------------------------- # Global run log screen # --------------------------------------------------------------------------- class GlobalHistoryScreen(ModalScreen[None]): BINDINGS = [ Binding("escape", "dismiss", "Close", priority=True), Binding("v", "view_in_editor", "View in editor", priority=True), ] DEFAULT_CSS = """ GlobalHistoryScreen { align: center middle; } #ghist-box { width: 95%; height: 95%; border: thick $primary; background: $panel; padding: 1 2; } #ghist-table { height: 1fr; } #ghist-source { height: 1fr; } #ghist-merge { height: 1fr; } #ghist-error { height: auto; max-height: 4; color: red; } .sql-label { margin-top: 1; color: $text-muted; } """ def __init__(self, client: PipekitClient) -> None: super().__init__() self.client = client self._runs: dict[str, dict] = {} def compose(self) -> ComposeResult: with Vertical(id="ghist-box"): yield Label("[bold]All Runs[/bold]") yield DataTable(id="ghist-table", cursor_type="row") yield Label("Source Query", classes="sql-label") yield TextArea("", read_only=True, id="ghist-source", language="sql") yield Label("Merge SQL", classes="sql-label") yield TextArea("", read_only=True, id="ghist-merge", language="sql") yield Static("", id="ghist-error") yield Footer() def on_mount(self) -> None: table = self.query_one("#ghist-table", DataTable) table.add_columns("Run", "Module", "Status", "Rows", "Started", "Finished", "Error") try: runs = self.client.list_runs(limit=100) for r in runs: run_id = str(r.get("id", "")) self._runs[run_id] = r status = r.get("status", "") if status == "success": status_display = "[green]success[/green]" elif status == "error": status_display = "[red]error[/red]" else: status_display = f"[yellow]{status}[/yellow]" table.add_row( run_id, r.get("module_name", "") or str(r.get("module_id", "")), status_display, str(r.get("row_count", "") or "-"), r.get("started_at", "")[:19], (r.get("finished_at") or "")[:19], (r.get("error") or "")[:50], key=run_id, ) except Exception as e: self.notify(f"Error loading runs: {e}", severity="error") def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None: if event.row_key is None: return run = self._runs.get(str(event.row_key.value)) if not run: return self.query_one("#ghist-source", TextArea).load_text(run.get("source_query") or "") self.query_one("#ghist-merge", TextArea).load_text(run.get("merge_sql") or "") error = run.get("error") or "" self.query_one("#ghist-error", Static).update(f"[red]{error}[/red]" if error else "") def action_view_in_editor(self) -> None: import tempfile table = self.query_one("#ghist-table", DataTable) if table.cursor_row is None or table.row_count == 0: return row = table.get_row_at(table.cursor_row) run = self._runs.get(str(row[0])) if not run: return parts = [] if run.get("source_query"): parts.append("-- SOURCE QUERY:") parts.append(run["source_query"]) if run.get("merge_sql"): parts.append("\n-- MERGE SQL:") parts.append(run["merge_sql"]) if run.get("error"): parts.append(f"\n-- ERROR: {run['error']}") sql = "\n".join(parts) if not sql.strip(): return editor = os.environ.get("EDITOR", "nvim") with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(sql) tmp_path = f.name try: with self.app.suspend(): os.system(f'{editor} -R "{tmp_path}"') finally: os.unlink(tmp_path) # --------------------------------------------------------------------------- # Run output screen # --------------------------------------------------------------------------- class RunScreen(ModalScreen[None]): BINDINGS = [Binding("escape", "dismiss", "Close", priority=True)] DEFAULT_CSS = """ RunScreen { align: center middle; } #run-box { width: 90%; height: 85%; border: thick $primary; background: $panel; padding: 1 2; } #run-log { height: 1fr; } """ def __init__(self, client: PipekitClient, module_id: int, module_name: str) -> None: super().__init__() self.client = client self.module_id = module_id self.module_name = module_name def compose(self) -> ComposeResult: with Vertical(id="run-box"): yield Label(f"[bold]Running: {self.module_name}[/bold]") yield RichLog(id="run-log", highlight=True, markup=True) def on_mount(self) -> None: self._run() @work(thread=True) def _run(self) -> None: log = self.query_one("#run-log", RichLog) self.app.call_from_thread(log.write, f"Starting sync: {self.module_name}...") try: import json for line in self.client.run_module_stream(self.module_id): if line.startswith("__DONE__"): result = json.loads(line[8:]) status = result.get("status", "unknown") row_count = result.get("row_count", 0) error = result.get("error", "") if status == "success": self.app.call_from_thread(log.write, f"\n[green]Success[/green] — {row_count} rows") else: self.app.call_from_thread(log.write, f"\n[red]Error[/red]: {error}") elif line.startswith("__ERROR__"): self.app.call_from_thread(log.write, f"\n[red]Error[/red]: {line[9:]}") else: self.app.call_from_thread(log.write, line) except Exception as e: self.app.call_from_thread(log.write, f"\n[red]Error[/red]: {e}") # --------------------------------------------------------------------------- # Module edit screen # --------------------------------------------------------------------------- class ModuleEditScreen(ModalScreen[dict | None]): BINDINGS = [Binding("escape", "dismiss", "Close", priority=True)] DEFAULT_CSS = """ ModuleEditScreen { align: center middle; } #edit-box { width: 85%; height: 90%; border: thick $primary; background: $panel; padding: 1 2; overflow-y: auto; } #edit-box Input, #edit-box Select { margin-bottom: 1; } .label { margin-top: 1; color: $text-muted; } #edit-query { height: 12; } #edit-buttons { height: 3; dock: bottom; } """ def __init__(self, client: PipekitClient, module: dict) -> None: super().__init__() self.client = client self.module = module def compose(self) -> ComposeResult: m = self.module conns = self.client.list_connections() conn_opts = [(c["name"], c["id"]) for c in conns] col_aliases = _parse_column_aliases(m.get("source_query", "")) col_opts = [(c, c) for c in col_aliases] with VerticalScroll(id="edit-box"): yield Label(f"[bold]Edit Module: {m['name']}[/bold]") yield Label("Module Name", classes="label") yield Input(value=m["name"], id="edit-name") yield Label("Source Connection", classes="label") yield Select(conn_opts, id="edit-source-conn", value=m["source_connection_id"], prompt="Select source...") yield Label("Destination Connection", classes="label") yield Select(conn_opts, id="edit-dest-conn", value=m["dest_connection_id"], prompt="Select destination...") yield Label("Destination Table (schema.table)", classes="label") yield Input(value=m["dest_table"], id="edit-dest-table") yield Label("Merge Strategy", classes="label") strat = m["merge_strategy"] with RadioSet(id="edit-strategy"): yield RadioButton("Full Refresh", value=(strat == "full"), id="estrat-full") yield RadioButton("Incremental", value=(strat == "incremental"), id="estrat-incr") yield RadioButton("Append", value=(strat == "append"), id="estrat-append") yield RadioButton("Upsert", value=(strat == "upsert"), id="estrat-upsert") mk = m.get("merge_key") yield Label("Merge Key", classes="label") mk_select = Select(col_opts, id="edit-merge-key", prompt="Select column...", allow_blank=True) yield mk_select wm = m.get("watermark_column") yield Label("Watermark Column (optional, defaults to merge key)", classes="label") wm_select = Select(col_opts, id="edit-ts-col", prompt="Select column...", allow_blank=True) yield wm_select yield Label("Enabled", classes="label") with RadioSet(id="edit-enabled"): yield RadioButton("Yes", value=bool(m["enabled"]), id="een-yes") yield RadioButton("No", value=not bool(m["enabled"]), id="een-no") with Horizontal(id="edit-buttons"): yield Button("Save", variant="primary", id="btn-save-edit") yield Button("Cancel", id="btn-cancel-edit") def on_mount(self) -> None: m = self.module col_set = set(_parse_column_aliases(m.get("source_query", ""))) mk = m.get("merge_key") if mk and mk in col_set: self.query_one("#edit-merge-key", Select).value = mk wm = m.get("watermark_column") if wm and wm in col_set: self.query_one("#edit-ts-col", Select).value = wm def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "btn-cancel-edit": self.dismiss(None) elif event.button.id == "btn-save-edit": self._save() def _save(self) -> None: strat_idx = self.query_one("#edit-strategy", RadioSet).pressed_index strategy = ["full", "incremental", "append", "upsert"][strat_idx] enabled_idx = self.query_one("#edit-enabled", RadioSet).pressed_index enabled = 1 if enabled_idx == 0 else 0 mk = self.query_one("#edit-merge-key", Select).value wm = self.query_one("#edit-ts-col", Select).value src_conn = self.query_one("#edit-source-conn", Select).value dst_conn = self.query_one("#edit-dest-conn", Select).value data = { "name": self.query_one("#edit-name", Input).value.strip(), "source_connection_id": src_conn if isinstance(src_conn, int) else None, "dest_connection_id": dst_conn if isinstance(dst_conn, int) else None, "dest_table": self.query_one("#edit-dest-table", Input).value.strip(), "merge_strategy": strategy, "merge_key": mk if isinstance(mk, str) else None, "watermark_column": wm if isinstance(wm, str) else None, "enabled": enabled, } try: updated = self.client.update_module(self.module["id"], data) self.dismiss(updated) except Exception as e: self.notify(f"Error: {e}", severity="error") # --------------------------------------------------------------------------- # Module detail screen # --------------------------------------------------------------------------- class ModuleDetailScreen(ModalScreen[None]): BINDINGS = [ Binding("escape", "dismiss", "Close", priority=True), Binding("q", "view_next_source", "Next Source SQL"), Binding("m", "view_merge", "Merge SQL"), Binding("h", "view_hooks", "Hooks"), Binding("b", "view_base_query", "Base Query"), Binding("e", "edit_source_query", "Edit Base Query"), Binding("s", "edit_settings", "Settings"), Binding("r", "run_module", "Run"), Binding("l", "show_log", "History"), ] DEFAULT_CSS = """ ModuleDetailScreen { align: center middle; } #detail-box { width: 90%; height: 90%; border: thick $primary; background: $panel; padding: 1 2; } #detail-info { margin-bottom: 1; } .sql-label { margin-top: 1; color: $text-muted; } #col-table { height: 1fr; } """ def __init__(self, client: PipekitClient, module: dict) -> None: super().__init__() self.client = client self.module = module self._preview = None self._last_run = None def compose(self) -> ComposeResult: m = self.module wm_display = m.get('watermark_column') or ('(merge key)' if m.get('merge_key') else '-') with Vertical(id="detail-box"): yield Label(f"[bold]{m['name']}[/bold]") yield Static( f"[bold]Strategy:[/bold] {m['merge_strategy']}\n" f"[bold]Merge Key:[/bold] {m.get('merge_key') or '-'}\n" f"[bold]Watermark:[/bold] {wm_display}\n" f"[bold]Dest Table:[/bold] {m['dest_table']}\n" f"[bold]Staging:[/bold] pipekit_staging.{m['name']}\n" f"[bold]Enabled:[/bold] {'yes' if m['enabled'] else 'no'}\n" f"[bold]Updated:[/bold] {m.get('updated_at', '-')[:19]}", id="detail-info", ) yield Label("Columns", classes="sql-label") yield DataTable(id="col-table", cursor_type="row") yield Footer() def on_mount(self) -> None: table = self.query_one("#col-table", DataTable) table.add_columns("#", "Source", "Alias", "Trimmed") columns = _parse_select_columns(self.module.get("source_query", "")) for i, col in enumerate(columns, 1): table.add_row(str(i), col["source"], col["alias"], col["trimmed"]) self._load_preview() @work(thread=True) def _load_preview(self) -> None: try: self._preview = self.client.preview_module(self.module["id"]) except Exception: pass try: runs = self.client.module_history(self.module["id"]) if runs: self._last_run = runs[0] except Exception: pass def _view_in_editor(self, sql: str) -> None: import tempfile editor = os.environ.get("EDITOR", "nvim") with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(sql) tmp_path = f.name try: with self.app.suspend(): os.system(f'{editor} -R "{tmp_path}"') finally: os.unlink(tmp_path) def action_view_next_source(self) -> None: if self._preview: self._view_in_editor(self._preview["source_query"]) else: self.notify("Preview not loaded yet — try again", severity="warning") def action_view_merge(self) -> None: if self._last_run and self._last_run.get("merge_sql"): self._view_in_editor(self._last_run["merge_sql"]) elif self._preview: self._view_in_editor(self._preview["merge_sql"]) else: self.notify("No merge SQL available", severity="warning") def action_view_hooks(self) -> None: try: hooks = self.client.list_hooks(self.module["id"]) if hooks: parts = [] for h in hooks: parts.append(f"-- Hook #{h['id']} (run on: {h['run_on']}, order: {h['run_order']})") parts.append(h["sql"]) parts.append("") self._view_in_editor("\n".join(parts)) else: self.notify("No hooks configured", severity="warning") except Exception as e: self.notify(f"Error: {e}", severity="error") def action_view_base_query(self) -> None: self._view_in_editor(self.module["source_query"]) def action_edit_source_query(self) -> None: import tempfile query = self.module["source_query"] editor = os.environ.get("EDITOR", "nvim") with tempfile.NamedTemporaryFile(mode="w", suffix=".sql", delete=False) as f: f.write(query) tmp_path = f.name try: with self.app.suspend(): os.system(f'{editor} "{tmp_path}"') with open(tmp_path) as f: new_query = f.read() if new_query != query: self.client.update_module(self.module["id"], {"source_query": new_query}) self.module["source_query"] = new_query table = self.query_one("#col-table", DataTable) table.clear() for i, col in enumerate(_parse_select_columns(new_query), 1): table.add_row(str(i), col["source"], col["alias"], col["trimmed"]) self.notify("Source query updated") self._load_preview() finally: os.unlink(tmp_path) def action_run_module(self) -> None: self.app.push_screen(RunScreen(self.client, self.module["id"], self.module["name"])) def action_edit_settings(self) -> None: def _on_result(updated: dict | None) -> None: if updated: self.module = updated wm = updated.get('watermark_column') or ('(merge key)' if updated.get('merge_key') else '-') self.query_one("#detail-info", Static).update( f"[bold]Strategy:[/bold] {updated['merge_strategy']}\n" f"[bold]Merge Key:[/bold] {updated.get('merge_key') or '-'}\n" f"[bold]Watermark:[/bold] {wm}\n" f"[bold]Dest Table:[/bold] {updated['dest_table']}\n" f"[bold]Staging:[/bold] pipekit_staging.{updated['name']}\n" f"[bold]Enabled:[/bold] {'yes' if updated['enabled'] else 'no'}\n" f"[bold]Updated:[/bold] {updated.get('updated_at', '-')[:19]}" ) self.notify("Settings saved") self._load_preview() self.app.push_screen(ModuleEditScreen(self.client, self.module), callback=_on_result) def action_show_log(self) -> None: self.app.push_screen(HistoryScreen(self.client, self.module["id"], self.module["name"])) # --------------------------------------------------------------------------- # Main app # --------------------------------------------------------------------------- class PipekitTUI(App): TITLE = "Pipekit" CSS = """ #main-area { height: 1fr; } #left-pane { width: 45; border-right: tall $accent; } #module-tree { height: 1fr; } #right-pane { width: 1fr; padding: 1 2; } #detail-title { text-style: bold; margin-bottom: 1; } #status-bar { dock: bottom; height: 1; background: $accent; color: $text; padding: 0 1; } """ BINDINGS = [ Binding("r", "run_selected", "Run"), Binding("i", "inspect_selected", "Inspect"), Binding("l", "log_selected", "Log"), Binding("L", "global_log", "All Runs"), Binding("n", "new_module", "New Module"), Binding("c", "manage_connections", "Connections"), Binding("g", "cursor_top", "Top", show=False), Binding("shift+g", "cursor_bottom", "Bottom", show=False), Binding("j", "cursor_down", "Down", show=False), Binding("k", "cursor_up", "Up", show=False), Binding("slash", "search", "Search", show=False), Binding("f5", "refresh", "Refresh"), Binding("q", "quit", "Quit"), ] def __init__(self, client: PipekitClient) -> None: super().__init__() self.client = client self._modules: list[dict] = [] self._mod_by_node: dict[int, dict] = {} def compose(self) -> ComposeResult: yield Header() with Horizontal(id="main-area"): with Vertical(id="left-pane"): yield Tree("Modules", id="module-tree") with Vertical(id="right-pane"): yield Static("Select a module", id="detail-title") yield Static("", id="detail-info") yield Static("", id="status-bar") yield Footer() def on_mount(self) -> None: self._load_modules() def _load_modules(self) -> None: try: self._modules = self.client.list_modules() except Exception as e: self.notify(f"API error: {e}", severity="error") self._modules = [] self._mod_by_node.clear() tree = self.query_one("#module-tree", Tree) tree.clear() tree.root.expand() # Group by source connection by_conn: dict[int, list[dict]] = {} conn_names: dict[int, str] = {} for m in self._modules: cid = m["source_connection_id"] by_conn.setdefault(cid, []).append(m) if cid not in conn_names: try: c = self.client.get_connection(cid) conn_names[cid] = c["name"] except Exception: conn_names[cid] = f"Connection {cid}" for cid, mods in by_conn.items(): branch = tree.root.add( f"[bold]{conn_names.get(cid, cid)}[/bold] ({len(mods)})", expand=True ) for m in mods: if m.get("running"): icon = "\u25b6" # running elif m["enabled"]: icon = "\u2714" # enabled else: icon = "\u25cb" # disabled node = branch.add_leaf( f"{icon} {m['name']} [dim]{m['merge_strategy']}[/dim] [dim]{m['dest_table']}[/dim]" ) self._mod_by_node[id(node)] = m self.query_one("#status-bar", Static).update( f" {len(self._modules)} modules" ) def on_tree_node_highlighted(self, event: Tree.NodeHighlighted) -> None: mod = self._mod_by_node.get(id(event.node)) if mod: self._show_detail(mod) def _show_detail(self, mod: dict) -> None: self.query_one("#detail-title", Static).update(f"[bold]{mod['name']}[/bold]") detail = ( f"[bold]Strategy:[/bold] {mod['merge_strategy']}\n" f"[bold]Dest:[/bold] {mod['dest_table']}\n" f"[bold]Key:[/bold] {mod.get('merge_key') or '-'}\n" f"[bold]Watermark:[/bold] {mod.get('watermark_column') or '-'}\n" f"[bold]Enabled:[/bold] {'yes' if mod['enabled'] else 'no'}\n" f"\n[dim]r=Run i=Inspect n=New c=Connections[/dim]" ) self.query_one("#detail-info", Static).update(detail) def _get_selected_module(self) -> dict | None: tree = self.query_one("#module-tree", Tree) node = tree.cursor_node if node: return self._mod_by_node.get(id(node)) return None def action_inspect_selected(self) -> None: mod = self._get_selected_module() if mod: self.push_screen(ModuleDetailScreen(self.client, mod)) def action_log_selected(self) -> None: mod = self._get_selected_module() if mod: self.push_screen(HistoryScreen(self.client, mod["id"], mod["name"])) def action_run_selected(self) -> None: mod = self._get_selected_module() if mod: self.push_screen(RunScreen(self.client, mod["id"], mod["name"])) def action_new_module(self) -> None: self.push_screen(NewModuleScreen(self.client), callback=lambda _: self._load_modules()) def action_global_log(self) -> None: self.push_screen(GlobalHistoryScreen(self.client)) def action_manage_connections(self) -> None: self.push_screen(ConnectionScreen(self.client)) def action_refresh(self) -> None: self._load_modules() self.notify("Refreshed") def action_cursor_down(self) -> None: self.query_one("#module-tree", Tree).action_cursor_down() def action_cursor_up(self) -> None: self.query_one("#module-tree", Tree).action_cursor_up() def action_cursor_top(self) -> None: self.query_one("#module-tree", Tree).scroll_home() def action_cursor_bottom(self) -> None: self.query_one("#module-tree", Tree).scroll_end() def action_search(self) -> None: self.push_screen(SearchScreen(), callback=self._apply_search) def _apply_search(self, query: str | None) -> None: if not query: return query = query.lower() tree = self.query_one("#module-tree", Tree) for node_id, mod in self._mod_by_node.items(): if query in mod["name"].lower() or query in mod["dest_table"].lower(): for branch in tree.root.children: for leaf in branch.children: if id(leaf) == node_id: tree.select_node(leaf) leaf.scroll_visible() self._show_detail(mod) return self.notify(f"No match for '{query}'", severity="warning") class SearchScreen(ModalScreen[str | None]): BINDINGS = [Binding("escape", "cancel", "Close")] DEFAULT_CSS = """ SearchScreen { align: center middle; } #search-box { width: 50%; height: 5; border: thick $primary; background: $panel; padding: 0 1; } """ def compose(self) -> ComposeResult: with Vertical(id="search-box"): yield Label("Search:") yield Input(placeholder="type to search...", id="search-input") def on_mount(self) -> None: self.query_one("#search-input", Input).focus() def on_input_submitted(self, event: Input.Submitted) -> None: self.dismiss(event.value) def action_cancel(self) -> None: self.dismiss(None) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": client = PipekitClient() app = PipekitTUI(client) app.run()