如何使用Golang的database.sql包批处理sql语句?
在Java中,我会这样做:
// Create a prepared statement
String sql = "INSERT INTO my_table VALUES(?)";
PreparedStatement pstmt = connection.prepareStatement(sql);
// Insert 10 rows of data
for (int i=0; i<10; i++) {
pstmt.setString(1, ""+i);
pstmt.addBatch();
}
// Execute the batch
int [] updateCounts = pstmt.executeBatch();
Run Code Online (Sandbox Code Playgroud)
我如何在Golang中实现同样的目标?
And*_*w C 54
由于db.Exec
函数是可变参数,因此一个选项(实际上只进行单个网络往返)是自己构造语句并爆炸参数并将其传递.
示例代码:
func BulkInsert(unsavedRows []*ExampleRowStruct) error {
valueStrings := make([]string, 0, len(unsavedRows))
valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
for _, post := range unsavedRows {
valueStrings = append(valueStrings, "(?, ?, ?)")
valueArgs = append(valueArgs, post.Column1)
valueArgs = append(valueArgs, post.Column2)
valueArgs = append(valueArgs, post.Column3)
}
stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
_, err := db.Exec(stmt, valueArgs...)
return err
}
Run Code Online (Sandbox Code Playgroud)
在我运行的一个简单的测试中,这个解决方案在插入10,000行时比在另一个答案中显示的Begin,Prepare,Commit快大约4倍 - 尽管实际的改进将在很大程度上取决于您的个人设置,网络延迟等.
扩展Avi Flax的答案,我在INSERT中需要一个ON CONFLICT DO UPDATE子句.
解决方案是将COPY复制到临时表(设置为在事务结束时删除),然后从临时表INSERT到永久表.
这是我确定的代码:
func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
tx, err := fdata.db.Begin()
if err != nil {
return errors.Wrap(err, "begin transaction")
}
txOK := false
defer func() {
if !txOK {
tx.Rollback()
}
}()
// The ON COMMIT DROP clause at the end makes sure that the table
// is cleaned up at the end of the transaction.
// While the "for{..} state machine" goroutine in charge of delayed
// saving ensures this function is not running twice at any given time.
_, err = tx.Exec(sqlFDataMakeTempTable)
// CREATE TEMPORARY TABLE fstore_data_load
// (map text NOT NULL, key text NOT NULL, data json)
// ON COMMIT DROP
if err != nil {
return errors.Wrap(err, "create temporary table")
}
stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
for key, val := range items {
_, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
if err != nil {
return errors.Wrap(err, "loading COPY data")
}
}
_, err = stmt.Exec()
if err != nil {
return errors.Wrap(err, "flush COPY data")
}
err = stmt.Close()
if err != nil {
return errors.Wrap(err, "close COPY stmt")
}
_, err = tx.Exec(sqlFDataSetFromTemp)
// INSERT INTO fstore_data (map, key, data)
// SELECT map, key, data FROM fstore_data_load
// ON CONFLICT DO UPDATE SET data = EXCLUDED.data
if err != nil {
return errors.Wrap(err, "move from temporary to real table")
}
err = tx.Commit()
if err != nil {
return errors.Wrap(err, "commit transaction")
}
txOK = true
return nil
}
Run Code Online (Sandbox Code Playgroud)
针对不支持占位符的PostgreSQL 改编Andrew的解决方案,可以实现?
以下工作:
func BulkInsert(unsavedRows []*ExampleRowStruct) error {
valueStrings := make([]string, 0, len(unsavedRows))
valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
i := 0
for _, post := range unsavedRows {
valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
valueArgs = append(valueArgs, post.Column1)
valueArgs = append(valueArgs, post.Column2)
valueArgs = append(valueArgs, post.Column3)
i++
}
stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
_, err := db.Exec(stmt, valueArgs...)
return err
}
Run Code Online (Sandbox Code Playgroud)
无法通过database/sql 中可用的接口进行批处理。但是,特定的数据库驱动程序可能会单独支持它。例如https://github.com/ziutek/mymysql似乎支持 MySQL 批处理。