feat: -b bulk copy into Postgres dest via COPY FROM STDIN
Extends -b to Postgres destinations: stream the source ResultSet into PG with COPY <dt> FROM STDIN (FORMAT csv) via the JDBC CopyManager, instead of batched INSERTs. COPY is text-based so the server parses each field into the column type — no per-type quoting needed. Every non-null value is CSV-quoted (so empty string stays distinct from NULL, which is an empty unquoted field); rows are flushed in 1000-row buffers with a 10k-row progress counter. Validated DB2->PG: numeric precision (123.4567), jsonb, unicode, embedded quotes, NULL vs empty-string all correct. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
parent
a61e018932
commit
6fe2bea089
@ -10,6 +10,9 @@ import com.microsoft.sqlserver.jdbc.SQLServerBulkCopy;
|
|||||||
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
|
import com.microsoft.sqlserver.jdbc.SQLServerBulkCopyOptions;
|
||||||
import com.microsoft.sqlserver.jdbc.ISQLServerBulkData;
|
import com.microsoft.sqlserver.jdbc.ISQLServerBulkData;
|
||||||
import com.microsoft.sqlserver.jdbc.SQLServerException;
|
import com.microsoft.sqlserver.jdbc.SQLServerException;
|
||||||
|
import org.postgresql.PGConnection;
|
||||||
|
import org.postgresql.copy.CopyManager;
|
||||||
|
import org.postgresql.copy.CopyIn;
|
||||||
|
|
||||||
public class jrunner {
|
public class jrunner {
|
||||||
//static final String QUERY = "SELECT * from rlarp.osm LIMIT 100";
|
//static final String QUERY = "SELECT * from rlarp.osm LIMIT 100";
|
||||||
@ -333,6 +336,48 @@ public class jrunner {
|
|||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
System.exit(0);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
|
} else if (bulk && dcu.toLowerCase().startsWith("jdbc:postgresql:")) {
|
||||||
|
//-------------------------------bulk copy (COPY, Postgres dest)--------------------------------------------
|
||||||
|
// Stream the source ResultSet into Postgres via COPY ... FROM STDIN.
|
||||||
|
// COPY is text-based: each field is sent as CSV text and the server
|
||||||
|
// parses it into the column type, so there's no per-type quoting.
|
||||||
|
// Non-null values are always CSV-quoted; NULL is an empty unquoted
|
||||||
|
// field; column order must match the dest (positional, as always).
|
||||||
|
System.out.println("------------bulk copy (COPY)------------------------------");
|
||||||
|
try {
|
||||||
|
CopyManager cm = ((PGConnection) dcon).getCopyAPI();
|
||||||
|
CopyIn cin = cm.copyIn("COPY " + dt + " FROM STDIN WITH (FORMAT csv)");
|
||||||
|
StringBuilder buf = new StringBuilder();
|
||||||
|
long rows = 0;
|
||||||
|
while (rs.next()) {
|
||||||
|
for (int i = 1; i <= cols; i++) {
|
||||||
|
if (i > 1) { buf.append(','); }
|
||||||
|
String val = rs.getString(i);
|
||||||
|
if (!rs.wasNull() && val != null) {
|
||||||
|
if (trim) { val = val.trim(); }
|
||||||
|
buf.append('"').append(val.replace("\"", "\"\"")).append('"');
|
||||||
|
}
|
||||||
|
// else: empty field -> NULL
|
||||||
|
}
|
||||||
|
buf.append('\n');
|
||||||
|
rows++;
|
||||||
|
if (rows % 1000 == 0) {
|
||||||
|
byte[] b = buf.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
|
||||||
|
cin.writeToCopy(b, 0, b.length);
|
||||||
|
buf.setLength(0);
|
||||||
|
if (rows % 10000 == 0) { System.out.print("\r" + rows); System.out.flush(); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (buf.length() > 0) {
|
||||||
|
byte[] b = buf.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
|
||||||
|
cin.writeToCopy(b, 0, b.length);
|
||||||
|
}
|
||||||
|
cin.endCopy();
|
||||||
|
System.out.print("\r" + rows);
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
System.exit(0);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
System.out.println("------------row count-------------------------------------");
|
System.out.println("------------row count-------------------------------------");
|
||||||
//-------------------------------build & execute sql-------------------------------------------------------------
|
//-------------------------------build & execute sql-------------------------------------------------------------
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user