Start a modern, modular successor to TheBigOne (side-by-side; TheBigOne untouched): - Sql.bas: stateless SQL-text generation from a 2D Variant array (header in row 1). Type inference from Excel cell VarType, dialect-aware identifier quoting/literals, VALUES batch + INSERT...SELECT, and CreateTableIfMissing. Successor to the SQLp_* helpers. - Db.cls: single connection object, late-bound ADO (no reference needed), SQLOLEDB by default (zero-install / non-admin friendly). Open/Exec/Query with structured LastError. Successor to the ADOp_* helpers. - TBFCLoad.bas: thin caller that loads the "Upload" tab into fanalysis.GS.TBFC (auto-create, replace-by-Source). Tested vs USMIDSQL01. - load_tbfc.py: equivalent Python loader kept as a power-user alternative. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
151 lines
5.2 KiB
Python
151 lines
5.2 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load the "Upload" tab of a forecast workbook into fanalysis.GS.TBFC (SQL Server).
|
|
|
|
Usage: python load_tbfc.py "<path to .xlsx>"
|
|
|
|
- Reads the saved workbook from disk (Excel need not be open).
|
|
- Infers a SQL type per column from the Excel cell types:
|
|
date cells -> DATE
|
|
numeric cells -> DECIMAL(18,2) (rounded half-up)
|
|
else (text) -> NVARCHAR(100)
|
|
- Creates fanalysis.GS.TBFC on first run if it doesn't exist.
|
|
- Replace-by-Source: deletes existing rows whose [Source] matches the
|
|
Source value(s) in this upload, then inserts the new rows. Other
|
|
scenarios already in the table are left untouched.
|
|
- Integrated (Windows) auth to USMIDSQL01; the whole load is one transaction.
|
|
"""
|
|
|
|
import sys
|
|
import datetime
|
|
from decimal import Decimal, ROUND_HALF_UP
|
|
|
|
import openpyxl
|
|
import pyodbc
|
|
|
|
SERVER, DATABASE, SCHEMA, TABLE = "USMIDSQL01", "fanalysis", "GS", "TBFC"
|
|
SHEET = "Upload"
|
|
HEADER_ROW = 8 # data starts on HEADER_ROW + 1
|
|
FIRST_COL = 2 # column B
|
|
LAST_COL = 11 # column K
|
|
NVARCHAR_LEN = 100
|
|
AMOUNT_DP = 2 # decimal places for numeric columns
|
|
|
|
FQ_TABLE = f"[{DATABASE}].[{SCHEMA}].[{TABLE}]"
|
|
|
|
|
|
def pick_driver():
|
|
prefer = ["ODBC Driver 18 for SQL Server", "ODBC Driver 17 for SQL Server",
|
|
"SQL Server Native Client 11.0", "SQL Server"]
|
|
avail = pyodbc.drivers()
|
|
for d in prefer:
|
|
if d in avail:
|
|
return d
|
|
if avail:
|
|
return avail[-1]
|
|
raise RuntimeError("No ODBC driver for SQL Server found.")
|
|
|
|
|
|
def connect():
|
|
driver = pick_driver()
|
|
cs = f"Driver={{{driver}}};Server={SERVER};Database={DATABASE};Trusted_Connection=yes;"
|
|
if "18" in driver: # driver 18 encrypts by default
|
|
cs += "Encrypt=no;TrustServerCertificate=yes;"
|
|
return pyodbc.connect(cs, autocommit=False)
|
|
|
|
|
|
def read_upload(path):
|
|
wb = openpyxl.load_workbook(path, read_only=True, data_only=True)
|
|
if SHEET not in wb.sheetnames:
|
|
raise RuntimeError(f'Sheet "{SHEET}" not found. Tabs: {wb.sheetnames}')
|
|
ws = wb[SHEET]
|
|
headers = [ws.cell(HEADER_ROW, c).value for c in range(FIRST_COL, LAST_COL + 1)]
|
|
rows = []
|
|
for r in range(HEADER_ROW + 1, ws.max_row + 1):
|
|
vals = [ws.cell(r, c).value for c in range(FIRST_COL, LAST_COL + 1)]
|
|
if all(v is None or (isinstance(v, str) and not v.strip()) for v in vals):
|
|
continue # skip blank rows
|
|
rows.append(vals)
|
|
wb.close()
|
|
return headers, rows
|
|
|
|
|
|
def infer_types(headers, rows):
|
|
"""Return [(sql_type, kind)] per column; kind in {date, num, text}."""
|
|
out = []
|
|
for j in range(len(headers)):
|
|
vals = [row[j] for row in rows if row[j] is not None]
|
|
if vals and all(isinstance(v, (datetime.datetime, datetime.date)) for v in vals):
|
|
out.append(("DATE", "date"))
|
|
elif vals and all(isinstance(v, (int, float)) and not isinstance(v, bool) for v in vals):
|
|
out.append((f"DECIMAL(18,{AMOUNT_DP})", "num"))
|
|
else:
|
|
out.append((f"NVARCHAR({NVARCHAR_LEN})", "text"))
|
|
return out
|
|
|
|
|
|
def coerce(value, kind):
|
|
if value is None:
|
|
return None
|
|
if kind == "date":
|
|
return value.date() if isinstance(value, datetime.datetime) else value
|
|
if kind == "num":
|
|
return Decimal(str(value)).quantize(Decimal(10) ** -AMOUNT_DP, rounding=ROUND_HALF_UP)
|
|
return str(value).strip()[:NVARCHAR_LEN]
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) < 2:
|
|
print("usage: python load_tbfc.py <workbook.xlsx>", file=sys.stderr)
|
|
return 2
|
|
path = sys.argv[1]
|
|
|
|
headers, rows = read_upload(path)
|
|
if not rows:
|
|
print("No data rows found on the Upload tab.", file=sys.stderr)
|
|
return 1
|
|
types = infer_types(headers, rows)
|
|
|
|
if "Source" not in headers:
|
|
raise RuntimeError(f'No "Source" column found. Headers: {headers}')
|
|
src_idx = headers.index("Source")
|
|
|
|
data = [[coerce(v, types[j][1]) for j, v in enumerate(row)] for row in rows]
|
|
sources = sorted({row[src_idx] for row in data if row[src_idx] is not None})
|
|
|
|
cols_ddl = ",\n ".join(f"[{h}] {t}" for h, (t, _) in zip(headers, types))
|
|
create_sql = (f"IF OBJECT_ID('{DATABASE}.{SCHEMA}.{TABLE}','U') IS NULL\n"
|
|
f"CREATE TABLE {FQ_TABLE} (\n {cols_ddl}\n);")
|
|
col_list = ", ".join(f"[{h}]" for h in headers)
|
|
insert_sql = f"INSERT INTO {FQ_TABLE} ({col_list}) VALUES ({', '.join('?' for _ in headers)})"
|
|
|
|
cn = connect()
|
|
cur = cn.cursor()
|
|
try:
|
|
cur.execute(create_sql)
|
|
deleted = 0
|
|
if sources:
|
|
marks = ", ".join("?" for _ in sources)
|
|
cur.execute(f"DELETE FROM {FQ_TABLE} WHERE [Source] IN ({marks})", *sources)
|
|
deleted = cur.rowcount
|
|
cur.fast_executemany = True
|
|
cur.executemany(insert_sql, data)
|
|
cn.commit()
|
|
except Exception:
|
|
cn.rollback()
|
|
raise
|
|
finally:
|
|
cur.close()
|
|
cn.close()
|
|
|
|
print("TBFC load complete.")
|
|
print(f" Source(s): {', '.join(sources)}")
|
|
print(f" Deleted: {deleted} existing row(s)")
|
|
print(f" Inserted: {len(data)} row(s)")
|
|
print(" Schema: " + ", ".join(f"{h} {t}" for h, (t, _) in zip(headers, types)))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|