Golang如何使用package database.sql批处理sql语句

bar*_*rdh 31 sql go

如何使用Golang的database.sql包批处理sql语句?

在Java中,我会这样做:

// Create a prepared statement
String sql = "INSERT INTO my_table VALUES(?)";
PreparedStatement pstmt = connection.prepareStatement(sql);

// Insert 10 rows of data
for (int i=0; i<10; i++) {
    pstmt.setString(1, ""+i);
    pstmt.addBatch();
}

// Execute the batch
int [] updateCounts = pstmt.executeBatch();
Run Code Online (Sandbox Code Playgroud)

我如何在Golang中实现同样的目标?

And*_*w C 54

由于db.Exec函数是可变参数,因此一个选项(实际上只进行单个网络往返)是自己构造语句并爆炸参数并将其传递.

示例代码:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, "(?, ?, ?)")
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}
Run Code Online (Sandbox Code Playgroud)

在我运行的一个简单的测试中,这个解决方案在插入10,000行时比在另一个答案中显示的Begin,Prepare,Commit快大约4倍 - 尽管实际的改进将在很大程度上取决于您的个人设置,网络延迟等.

  • 以防万一有人遇到这个答案并且没有意识到(像我一样)语法 `(?, ?, ?)` 是 `MySql` 特定的,需要更改为 `($1, $2, $3), ... ($ n, $n+1 $n+2)` 代表 `Postgresql`。 (5认同)
  • @Xeoncross 按照我在这里写的方式,您是正确的,它们是不同的交易。但我相信你可以很容易地做到`db.Begin(); &lt;上面的代码,但带有 tx.Exec&gt;; db.Commit()` 将其作为一个事务运行。此外,这对 SQL 注入*是*安全的,因为它使用了`?` 占位符。 (4认同)
  • 我用这个答案构建了一些东西,发现在撰写本文时,MySQL中的占位符似乎有2 ^ 16-1(65,535)的限制.我最终在循环中运行了多个插入(一次插入大约10,000行)只是为了安全起见. (4认同)

Avi*_*lax 12

如果你正在使用PostgreSQL,那么pq支持批量导入.


Rik*_*ing 6

扩展Avi Flax的答案,我在INSERT中需要一个ON CONFLICT DO UPDATE子句.

解决方案是将COPY复制到临时表(设置为在事务结束时删除),然后从临时表INSERT到永久表.

这是我确定的代码:

func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
    tx, err := fdata.db.Begin()
    if err != nil {
        return errors.Wrap(err, "begin transaction")
    }
    txOK := false
    defer func() {
        if !txOK {
            tx.Rollback()
        }
    }()

    // The ON COMMIT DROP clause at the end makes sure that the table
    // is cleaned up at the end of the transaction.
    // While the "for{..} state machine" goroutine in charge of delayed
    // saving ensures this function is not running twice at any given time.
    _, err = tx.Exec(sqlFDataMakeTempTable)
    // CREATE TEMPORARY TABLE fstore_data_load
    // (map text NOT NULL, key text NOT NULL, data json)
    // ON COMMIT DROP
    if err != nil {
        return errors.Wrap(err, "create temporary table")
    }

    stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
    for key, val := range items {
        _, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
        if err != nil {
            return errors.Wrap(err, "loading COPY data")
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return errors.Wrap(err, "flush COPY data")
    }
    err = stmt.Close()
    if err != nil {
        return errors.Wrap(err, "close COPY stmt")
    }

    _, err = tx.Exec(sqlFDataSetFromTemp)
    // INSERT INTO fstore_data (map, key, data)
    // SELECT map, key, data FROM fstore_data_load
    // ON CONFLICT DO UPDATE SET data = EXCLUDED.data
    if err != nil {
        return errors.Wrap(err, "move from temporary to real table")
    }

    err = tx.Commit()
    if err != nil {
        return errors.Wrap(err, "commit transaction")
    }
    txOK = true
    return nil
}
Run Code Online (Sandbox Code Playgroud)


Mas*_*arl 6

针对不支持占位符的PostgreSQL 改编Andrew的解决方案,可以实现?以下工作:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}
Run Code Online (Sandbox Code Playgroud)

  • 我没有看到风险,字符串插值只使用递增的 `i` 参数(valueStrings 将是 `($1, $2, $3),($4, $5, $6),...`)。`ExampleRowStruct`s 只传递给 db.Exec,数据库驱动程序负责替换占位符。 (3认同)

Mat*_*att 1

无法通过database/sql 中可用的接口进行批处理。但是,特定的数据库驱动程序可能会单独支持它。例如https://github.com/ziutek/mymysql似乎支持 MySQL 批处理。