使用 SqlBulkCopy 时提供流作为二进制列的数据源

i-o*_*one 3 c# sql-server ado.net sqlbulkcopy idatareader

如果需要以流式方式从SqlServer 读取数据,可以使用一些功能来实现。例如使用SqlDataReaderwith CommandBehavior.SequentialAccess,特别是当需要访问二进制列数据时,有以下方法GetStream(int)

var cmd = new SqlCommand();
cmd.Connection = connection;
cmd.CommandText = @"select 0x0123456789 as Data";

using (var dr = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
{
    dr.Read();

    var stream = dr.GetStream(0);
    // access stream
}
Run Code Online (Sandbox Code Playgroud)

但是,当需要使用向SqlServer 提供数据时SqlBulkCopy,特别是需要将流作为二进制列的数据源提供时,如何以相反的方向流式传输数据呢?

我尝试跟随

var cmd2 = new SqlCommand();
cmd2.Connection = connection;
cmd2.CommandText = @"create table #Test (ID int, Data varbinary(max))";
cmd2.ExecuteNonQuery();

using (SqlBulkCopy sbc = new SqlBulkCopy(connection, SqlBulkCopyOptions.TableLock, null))
{
    sbc.DestinationTableName = "#Test";
    sbc.EnableStreaming = true;

    sbc.ColumnMappings.Add(0, "ID");
    sbc.ColumnMappings.Add(1, "Data");

    sbc.WriteToServer(new TestDataReader());
}
Run Code Online (Sandbox Code Playgroud)

其中TestDataReader实现IDataReader如下:

class TestDataReader : IDataReader
{
    public int FieldCount { get { return 2; } }
    int rowCount = 1;
    public bool Read() { return (rowCount++) < 3; }
    public bool IsDBNull(int i) { return false; }

    public object GetValue(int i)
    {
        switch (i)
        {
            case 0: return rowCount;
            case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
            default: throw new Exception();
        }
    }

    //the rest members of IDataReader
}
Run Code Online (Sandbox Code Playgroud)

它按预期工作。

然而变化

case 1: return new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 };
Run Code Online (Sandbox Code Playgroud)

case 1: return new MemoryStream(new byte[] { 0x01, 0x23, 0x45, 0x67, 0x89 });
Run Code Online (Sandbox Code Playgroud)

System.InvalidOperationException导致消息异常

数据源中的 MemoryStream 类型的给定值无法转换为指定目标列的 varbinary 类型。

有没有一种方法可以提供Streamfrom IDataReader(或可能DbDataReader) toSqlBulkCopy作为二进制列的数据源,而无需首先将其所有数据复制到内存(字节数组)中?

Evk*_*Evk 5

不确定这是否在任何地方都有记录,但如果对SqlBulkCopy源代码进行简短的检查,您可能会发现它以不同的方式处理不同的数据读取器。首先,SqlBulkCopy确实支持流式处理和GetStream,但您可能会注意到该IDataReader接口不包含GetStream方法。因此,当您IDataReader向 - 提供自定义实现时SqlBulkCopy,它不会将二进制列视为流式传输,并且不会接受Stream类型的值。

另一方面-DbDataReader 确实有这个方法。如果您提供继承类SqlBulkCopy的实例DbDataReader- 它将以流式方式处理所有二进制列并调用DbDataReader.GetStream.

因此,要解决您的问题 - 继承如下DbDataReader

class TestDataReader : DbDataReader
{
    public override bool IsDBNull(int ordinal) {
        return false;
    }

    public override int FieldCount { get; } = 2;
    int rowCount = 1;

    public override bool HasRows { get; } = true;
    public override bool IsClosed { get; } = false;

    public override bool Read()
    {
        return (rowCount++) < 3;
    }

    public override object GetValue(int ordinal) {
        switch (ordinal) {
            // do not return anything for binary column here - it will not be called
            case 0:
                return rowCount;
            default:
                throw new Exception();
        }
    }

    public override Stream GetStream(int ordinal) {
        // instead - return your stream here
        if (ordinal == 1)
            return new MemoryStream(new byte[] {0x01, 0x23, 0x45, 0x67, 0x89});
        throw new Exception();
    }
    // bunch of irrelevant stuff

}
Run Code Online (Sandbox Code Playgroud)