feat: -b bulk copy into SQL Server dest via SQLServerBulkCopy

Adds an opt-in -b flag (migration mode, SQL Server dest only) that streams the
source ResultSet straight into SQL Server over the TDS bulk-load protocol
instead of 250-row INSERT...VALUES round trips. A BulkSource adapter
(ISQLServerBulkData) maps PG source types to JDBC types we control: string-ish
types (text/varchar/char/bpchar/json/jsonb/uuid/numeric) go through NVARCHAR via
getString so SQL Server converts losslessly — notably numeric, since PG reports
unconstrained numeric as scale 0 which made a typed DECIMAL path round
(123.45 -> 123). Default stays the INSERT path, so nothing regresses.

Validated against live PG->SQL Server: int4/text/jsonb/numeric/date plus nulls,
unicode, quotes, and numeric precision (123.45, 0.123456) all correct.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Paul Trowbridge 2026-06-17 22:18:45 -04:00
parent d9fd651c72
commit ce76e93a77

View File

@ -6,6 +6,10 @@ import java.nio.file.Path ;
import java.nio.file.Paths;
import java.time.*;
import java.io.IOException;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
import com.microsoft.sqlserver.jdbc.ISQLServerBulkData;
import com.microsoft.sqlserver.jdbc.SQLServerException;
public class jrunner {
//static final String QUERY = "SELECT * from rlarp.osm LIMIT 100";
@ -24,6 +28,7 @@ public class jrunner {
String dt = "";
Boolean trim = true;
Boolean clear = true;
Boolean bulk = false;
Integer r = 0;
Integer t = 0;
String sql = "";
@ -57,6 +62,7 @@ public class jrunner {
msg = msg + nl + "-dt fully qualified name of destination table";
msg = msg + nl + "-t trim text";
msg = msg + nl + "-c clear target table";
msg = msg + nl + "-b bulk copy into destination (SQL Server dest only)";
msg = msg + nl + "-f output format (csv, tsv, table, json) - default: csv";
msg = msg + nl + "--help info";
msg = msg + nl + "";
@ -125,6 +131,9 @@ public class jrunner {
case "-c":
clear = true;
break;
case "-b":
bulk = true;
break;
case "-f":
outputFormat = args[i+1].toLowerCase();
break;
@ -300,6 +309,25 @@ public class jrunner {
e.printStackTrace();
System.exit(0);
}
} else if (bulk && dcu.toLowerCase().startsWith("jdbc:sqlserver:")) {
//-------------------------------bulk copy (SQL Server dest)-------------------------------------------------
// Stream the source ResultSet straight into SQL Server over the TDS
// bulk-load protocol no per-row INSERT round trips. Source type
// names map to JDBC types via BulkSource (string-ish -> NVARCHAR).
System.out.println("------------bulk copy-------------------------------------");
try {
SQLServerBulkCopy bulkCopy = new SQLServerBulkCopy(dcon);
bulkCopy.setDestinationTableName(dt);
SQLServerBulkCopyOptions options = new SQLServerBulkCopyOptions();
options.setBatchSize(10000);
options.setBulkCopyTimeout(0);
bulkCopy.setBulkCopyOptions(options);
bulkCopy.writeToServer(new BulkSource(rs, cols, dtn, trim));
bulkCopy.close();
} catch (Exception e) {
e.printStackTrace();
System.exit(0);
}
} else {
System.out.println("------------row count-------------------------------------");
//-------------------------------build & execute sql-------------------------------------------------------------
@ -478,6 +506,128 @@ public class jrunner {
System.out.println();
}
// Adapts a source ResultSet to SQLServerBulkCopy. Maps source type names to
// JDBC types we control: string-ish PG types (text/varchar/char/bpchar/json/
// jsonb/uuid/...) are declared NVARCHAR and read via getString mirroring the
// INSERT path's "quote it" choices and sidestepping unsupported types (jsonb
// reports as OTHER, which writeToServer(ResultSet) can't handle directly).
static class BulkSource implements ISQLServerBulkData {
private final ResultSet rs;
private final ResultSetMetaData md;
private final int cols;
private final boolean trim;
private final int[] jdbcType;
private final boolean[] asString;
BulkSource(ResultSet rs, int cols, String[] dtn, boolean trim) throws SQLException {
this.rs = rs;
this.cols = cols;
this.trim = trim;
this.md = rs.getMetaData();
this.jdbcType = new int[cols + 1];
this.asString = new boolean[cols + 1];
for (int i = 1; i <= cols; i++) {
String t = (dtn[i] == null ? "" : dtn[i].toUpperCase());
switch (t) {
case "INT2": case "SMALLINT":
jdbcType[i] = Types.SMALLINT; break;
case "INT4": case "INT": case "INTEGER": case "SERIAL":
jdbcType[i] = Types.INTEGER; break;
case "INT8": case "BIGINT": case "BIGSERIAL":
jdbcType[i] = Types.BIGINT; break;
// numeric/decimal: PG reports unconstrained numeric as
// scale 0, which makes bulk copy round (123.45 -> 123). Send
// the exact text instead and let SQL Server convert it into
// the dest column losslessly.
case "FLOAT4": case "REAL":
jdbcType[i] = Types.REAL; break;
case "FLOAT8": case "DOUBLE": case "DOUBLE PRECISION":
jdbcType[i] = Types.DOUBLE; break;
case "BOOL": case "BOOLEAN": case "BIT":
jdbcType[i] = Types.BIT; break;
case "DATE":
jdbcType[i] = Types.DATE; break;
case "TIME":
jdbcType[i] = Types.TIME; break;
case "TIMESTAMP": case "TIMESTAMPTZ":
case "DATETIME": case "DATETIME2": case "SMALLDATETIME":
jdbcType[i] = Types.TIMESTAMP; break;
case "BYTEA": case "BINARY": case "VARBINARY":
jdbcType[i] = Types.VARBINARY; break;
default:
jdbcType[i] = Types.LONGNVARCHAR;
asString[i] = true;
break;
}
}
}
public java.util.Set<Integer> getColumnOrdinals() {
java.util.Set<Integer> ords = new java.util.TreeSet<Integer>();
for (int i = 1; i <= cols; i++) { ords.add(i); }
return ords;
}
public String getColumnName(int column) {
try { return md.getColumnName(column); }
catch (SQLException e) { return "col" + column; }
}
public int getColumnType(int column) { return jdbcType[column]; }
public int getPrecision(int column) {
if (asString[column]) { return 0; } // LONGNVARCHAR -> nvarchar(max)
try {
int p = md.getPrecision(column);
return (p > 0 ? p : 38);
} catch (SQLException e) { return 38; }
}
public int getScale(int column) {
if (jdbcType[column] == Types.DECIMAL) {
try { return md.getScale(column); }
catch (SQLException e) { return 0; }
}
return 0;
}
public boolean next() throws SQLServerException {
try { return rs.next(); }
catch (SQLException e) { throw new RuntimeException(e); }
}
public Object[] getRowData() throws SQLServerException {
Object[] row = new Object[cols];
try {
for (int i = 1; i <= cols; i++) {
Object v;
switch (jdbcType[i]) {
case Types.SMALLINT:
case Types.INTEGER:
case Types.BIGINT:
case Types.REAL:
case Types.DOUBLE: v = rs.getObject(i); break;
case Types.DECIMAL: v = rs.getBigDecimal(i); break;
case Types.BIT: v = rs.getBoolean(i); break;
case Types.DATE: v = rs.getDate(i); break;
case Types.TIME: v = rs.getTime(i); break;
case Types.TIMESTAMP:v = rs.getTimestamp(i); break;
case Types.VARBINARY:v = rs.getBytes(i); break;
default: {
String s = rs.getString(i);
if (s != null && trim) { s = s.trim(); }
v = s;
break;
}
}
if (rs.wasNull()) { v = null; }
row[i - 1] = v;
}
} catch (SQLException e) { throw new RuntimeException(e); }
return row;
}
}
private static void outputQueryResults(ResultSet rs, int cols, String[] dtn, String format) throws SQLException {
switch (format) {
case "csv":