Drivers: add list_schemas() to base, PG, DB2, MSSQL

Base provides a no-op default; drivers opt in by overriding. MSSQL
scopes the lookup to a linked server / database when those qualifiers
are supplied.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-05-02 20:56:10 -04:00
parent f18ea55a12
commit ff19ae9b81
4 changed files with 37 additions and 0 deletions

View File

@ -98,6 +98,14 @@ class Driver(abc.ABC):
def list_tables(self, conn: dict, **qualifiers) -> list[RemoteTable]: def list_tables(self, conn: dict, **qualifiers) -> list[RemoteTable]:
"""Fetch tables/views matching the qualifiers.""" """Fetch tables/views matching the qualifiers."""
def list_schemas(self, conn: dict, **qualifiers) -> list[str]:
"""Return schema/library names available on the source.
Default returns []; drivers opt in by overriding. ``qualifiers`` lets
drivers like MSSQL scope the lookup to a database/linked-server.
"""
return []
@abc.abstractmethod @abc.abstractmethod
def get_columns(self, conn: dict, table: str, **qualifiers) -> list[RemoteColumn]: def get_columns(self, conn: dict, table: str, **qualifiers) -> list[RemoteColumn]:
"""Fetch column metadata for one table.""" """Fetch column metadata for one table."""

View File

@ -44,6 +44,12 @@ class DB2Driver(Driver):
help="e.g. RLDBF12"), help="e.g. RLDBF12"),
] ]
def list_schemas(self, conn, **_) -> list[str]:
result = self.query(
conn, "SELECT SCHEMA_NAME FROM QSYS2.SYSSCHEMAS "
"ORDER BY SCHEMA_NAME")
return [r[0].strip() for r in result.rows if r and r[0]]
def list_tables(self, conn, *, schema: str) -> list[RemoteTable]: def list_tables(self, conn, *, schema: str) -> list[RemoteTable]:
validate_identifier(schema, "schema") validate_identifier(schema, "schema")
sql = ( sql = (

View File

@ -49,6 +49,20 @@ class MSSQLDriver(Driver):
required=False, default="dbo"), required=False, default="dbo"),
] ]
def list_schemas(
self, conn, *, linked_server: str | None = None,
database: str | None = None, **_,
) -> list[str]:
self._validate(linked_server, database, None)
prefix = self._info_schema_prefix(linked_server, database)
sql = (
f"SELECT DISTINCT TABLE_SCHEMA FROM {prefix}INFORMATION_SCHEMA.TABLES "
f"WHERE TABLE_TYPE IN ('BASE TABLE','VIEW') "
f"ORDER BY TABLE_SCHEMA"
)
result = self.query(conn, sql)
return [r[0].strip() for r in result.rows if r and r[0]]
def list_tables( def list_tables(
self, conn, *, linked_server: str | None = None, self, conn, *, linked_server: str | None = None,
database: str | None = None, schema: str | None = None, database: str | None = None, schema: str | None = None,

View File

@ -41,6 +41,15 @@ class PGDriver(Driver):
required=False, default="public"), required=False, default="public"),
] ]
def list_schemas(self, conn, **_) -> list[str]:
result = self.query(
conn,
"SELECT schema_name FROM information_schema.schemata "
"WHERE schema_name NOT IN ('pg_catalog','information_schema') "
"AND schema_name NOT LIKE 'pg\\_%' ESCAPE '\\' "
"ORDER BY schema_name")
return [r[0].strip() for r in result.rows if r and r[0]]
def list_tables(self, conn, *, schema: str | None = None) -> list[RemoteTable]: def list_tables(self, conn, *, schema: str | None = None) -> list[RemoteTable]:
if schema: if schema:
validate_identifier(schema, "schema") validate_identifier(schema, "schema")