C# 使用 DataReader 和 CSV writer 批量/分块导出 SQL Server 大表

sun*_*nny 3 c# sql-server bcp sqldatareader csvhelper

我开发了一个 CSV 批处理编写器。但与 BCP 相比,这个过程似乎相当缓慢。我唯一的要求是将没有标识或主键列的大表导出到多个小型 CSV 文件中,并使用相应的批处理 ID 命名它们。

BCP 的问题是它只会写入单个大文件。

我当前的流程所做的是:读取数据并使用 CSV 编写器写入内存流我不断检查内存流是否大于特定的批处理大小,然后我将异步复制内存流写入文本文件。

如果没有内存不足的异常,我可以导出批量大小为 250MB 的文件

但这个过程比 BCP 导出要多花 5 倍的时间。

有没有比我正在做的更好的方法来实现批量导出到 CSV。

请指教。

Mit*_*tch 10

我想到了几个选择:

\n

使用 FETCH/OFFSET

\n

如果源查询能够在 SQL Server 中轻松地进行批处理(例如,可以关闭的聚集索引),则 FETCH 和 OFFSET 基本上是免费的。

\n

如果表是堆,FETCH/OFFSET 并不是一个真正的选择,但您可能会考虑添加聚集索引,因为没有太多好的理由反对这样做(尽管对于 100 GB 的表来说这样做会很昂贵:)

\n
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w\n
Run Code Online (Sandbox Code Playgroud)\n

使用SqlDataReader

\n

根据使用约 1.2 GB 的表进行的测量,C# CSV SQL 导出的 na\xc3\xafve 实现(如下)在同一表和系统上实现了 BCP 75% 的性能。(它还有一个好处是可以正确处理有关嵌入逗号、引号和 CRLF 的CSV 格式)。

\n
static void Main(string[] args)\n{\n    var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");\n    con.Open();\n\n    var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);\n\n    using (var reader = sqr.ExecuteReader())\n    using (var tw = File.CreateText("out.csv"))\n    {\n        while (reader.Read())\n        {\n            for (int i = 0; i < reader.FieldCount; i++)\n            {\n                if (i != 0)\n                {\n                    tw.Write(\',\');\n                }\n\n                var val = FormatValue(reader[i]);\n                if (val == null)\n                {\n                    // no-op\n                }\n                else if (val.IndexOfAny(new[] { \'"\', \',\', \'\\r\', \'\\n\' }) >= 0)\n                {\n                    tw.Write(\'"\');\n                    tw.Write(val.Replace("\\"", "\\"\\""));\n                    tw.Write(\'"\');\n                }\n                else\n                {\n                    tw.Write(val);\n                }\n            }\n            tw.Write("\\r\\n");\n        }\n    }\n}\n\nprivate static string FormatValue(object v)\n{\n    if (v == null)\n    {\n        return null;\n    }\n    if (v is DateTime dt)\n    {\n        return dt.ToString("O");\n    }\n    if (v is DateTimeOffset dto)\n    {\n        return dto.ToString("O");\n    }\n    if (v is byte[] ba)\n    {\n        var sb = new StringBuilder(2 + ba.Length * 2);\n        sb.Append("0x");\n        for (int i = 0; i < ba.Length; i++)\n        {\n            sb.Append(ba[i].ToString("X2"));\n        }\n        return sb.ToString();\n    }\n    return v.ToString();\n}\n
Run Code Online (Sandbox Code Playgroud)\n

性能似乎受到 GC 处理如此多字符串分配的限制 - 因此,如果需要更高的性能,则将相同的内容转换为非 CLR 语言(例如 C++)可能会与 BCP 的性能相匹配。

\n

使用SSIS

\n

SSIS 可以在一个包中执行所有步骤。确切的步骤可能最好留给另一个问题,但基本上相当于合成“文件编号”列并使用平面文件目标。 这方面的坏例子

\n

使用SSIS生成一个大的CSV,然后将其拆分

\n

如果您使用 SSIS(直接使用或使用导出数据向导),您将获得一个可以拆分的符合 RFC 4180 的 CSV 文件。分割此类文件的示例工具是:

\n
class Program\n{\n    static void Main(string[] args)\n    {\n        int n = 0;\n        using (var src = File.OpenRead("rfc4180_in.csv"))\n        using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))\n        {\n            src.CopyTo(dst);\n        }\n    }\n}\n\n/// <summary>\n/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least \n/// cutAfterPosition bytes long.\n/// </summary>\nabstract class SplittingWriteStream : Stream\n{\n    private long _TotalPosition;\n    private long CurrentStreamPos;\n    private readonly long CutAfterPosition;\n    private readonly Func<Stream> StreamCtor;\n    private Stream CurrentStream;\n\n    public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n    {\n        if (cutAfterPosition < 0L)\n        {\n            throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));\n        }\n        this.CutAfterPosition = cutAfterPosition;\n\n        this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));\n        this.CurrentStream = createStream();\n    }\n\n    protected override void Dispose(bool disposing) => CurrentStream.Dispose();\n\n    public override void Flush() => CurrentStream.Flush();\n\n    public override void Write(byte[] buffer, int offset, int count)\n    {\n        // ignore count to always exceed cutAfterPosition\n        var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);\n        if (cutPoint < 0)\n        {\n            CurrentStream.Write(buffer, offset, count);\n        }\n        else\n        {\n            if (cutPoint > 0)\n            {\n                CurrentStream.Write(buffer, offset, cutPoint);\n            }\n\n            try\n            {\n                CurrentStream.Dispose();\n            }\n            finally\n            {\n                CurrentStream = null;\n                CurrentStreamPos = 0L;\n                CurrentStream = StreamCtor();\n            }\n\n            if (cutPoint != count)\n            {\n                CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);\n            }\n        }\n\n        CurrentStreamPos += count;\n        _TotalPosition += count;\n    }\n\n    protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);\n\n    #region Stream Write-only stubs\n\n    public override bool CanRead => false;\n    public override bool CanSeek => false;\n    public override bool CanWrite => true;\n    public override long Length => throw new NotSupportedException();\n    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();\n    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();\n    public override void SetLength(long value) => throw new NotSupportedException();\n\n    public override long Position\n    {\n        get => _TotalPosition;\n        set => throw new NotSupportedException();\n    }\n\n    #endregion\n}\n\nclass CsvRfc4180SplittingWriteStream : SplittingWriteStream\n{\n    public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n        : base(createStream, cutAfterPosition)\n    {\n    }\n\n    bool inQuotedString;\n    bool lastWasQuote;\n    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n    {\n        int? cutPoint = null;\n        for (int n = 0; n < count; n++)\n        {\n            var i = n + offset;\n            StepState(buffer[i]);\n\n            // check for CRLF if desired and not escaped\n            if (getCutPoint && !inQuotedString && cutPoint == null\n                && buffer[i] == \'\\r\' && n + 1 < count && buffer[i + 1] == \'\\n\')\n            {\n                cutPoint = n;\n            }\n        }\n\n        return cutPoint ?? -1;\n    }\n\n    private void StepState(byte v)\n    {\n        var isQuote = v == \'"\';\n        if (lastWasQuote)\n        {\n            lastWasQuote = false;\n\n            if (isQuote)\n            {\n                // Double quotes:\n                //  nop\n                //  Inside quoted string == literal escape\n                //  Outside quoted string == empty string\n            }\n            else\n            {\n                // quote with non-quote following == toggle quoted string\n                inQuotedString ^= true;\n            }\n        }\n        else\n        {\n            lastWasQuote = isQuote;\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

使用 BCP,然后即时拆分

\n

如果需要 BCP,并且它对 CSV 的(不良)处理是可以容忍的,那么它可以写入命名管道流以进行动态拆分。

\n
class Program\n{\n    static void Main(string[] args)\n    {\n        Thread copyThread;\n        var pipeId = $"bcp_{Guid.NewGuid():n}";\n        // bcp requires read/write pipe\n        using (var np = new NamedPipeServerStream(pipeId))\n        {\n            copyThread = new Thread(_1 =>\n            {\n                np.WaitForConnection();\n                int n = 0;\n                // Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)\n                // Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)\n                using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))\n                {\n                    np.CopyTo(dst);\n                }\n            });\n            copyThread.Name = "Write thread";\n            copyThread.IsBackground = true;\n            copyThread.Start();\n\n            var bcp = Process.Start(\n                @"C:\\Program Files\\Microsoft SQL Server\\Client SDK\\ODBC\\170\\Tools\\Binn\\bcp.exe",\n                $@"FWDB.Rx.RxBatches out \\\\.\\pipe\\{pipeId} -S (local) -U sa -P abc -w -t,");\n            bcp.WaitForExit();\n        }\n        copyThread.Join();\n    }\n}\n\nclass CrlfUtf16leSplittingWriteStream : SplittingWriteStream\n{\n    public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n        : base(createStream, cutAfterPosition)\n    {\n    }\n\n    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n    {\n        if (getCutPoint)\n        {\n            for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)\n            {\n                var i = n + offset;\n                if (buffer[i] == \'\\r\' && buffer[i + 1] == 0\n                    && buffer[i + 2] == \'\\n\' && buffer[i + 3] == 0)\n                {\n                    // split after CRLF\n                    return n + 4;\n                }\n            }\n        }\n\n        return -1;\n    }\n}\n\nclass CrlfUtf8SplittingWriteStream : SplittingWriteStream\n{\n    public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n        : base(createStream, cutAfterPosition)\n    {\n    }\n\n    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n    {\n        if (getCutPoint)\n        {\n            for (int n = 0; n < count - 1 /* CR LF */; n++)\n            {\n                var i = n + offset;\n                if (buffer[i] == \'\\r\' && buffer[i + 1] == \'\\n\')\n                {\n                    // split after CRLF\n                    return n + 2;\n                }\n            }\n        }\n\n        return -1;\n    }\n}\n\n/// <summary>\n/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least \n/// cutAfterPosition bytes long.\n/// </summary>\nabstract class SplittingWriteStream : Stream\n{\n    private long _TotalPosition;\n    private long CurrentStreamPos;\n    private readonly long CutAfterPosition;\n    private readonly Func<Stream> StreamCtor;\n    private Stream CurrentStream;\n\n    public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n    {\n        if (cutAfterPosition < 0L)\n        {\n            throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));\n        }\n        this.CutAfterPosition = cutAfterPosition;\n\n        this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));\n        this.CurrentStream = createStream();\n    }\n\n    protected override void Dispose(bool disposing) => CurrentStream.Dispose();\n\n    public override void Flush() => CurrentStream.Flush();\n\n    public override void Write(byte[] buffer, int offset, int count)\n    {\n        // ignore count to always exceed cutAfterPosition\n        var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);\n        if (cutPoint < 0)\n        {\n            CurrentStream.Write(buffer, offset, count);\n        }\n        else\n        {\n            if (cutPoint > 0)\n            {\n                CurrentStream.Write(buffer, offset, cutPoint);\n            }\n\n            try\n            {\n                CurrentStream.Dispose();\n            }\n            finally\n            {\n                CurrentStream = null;\n                CurrentStreamPos = 0L;\n                CurrentStream = StreamCtor();\n            }\n\n            if (cutPoint != count)\n            {\n                CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);\n            }\n        }\n\n        CurrentStreamPos += count;\n        _TotalPosition += count;\n    }\n\n    protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);\n\n    #region Stream Write-only stubs\n\n    public override bool CanRead => false;\n    public override bool CanSeek => false;\n    public override bool CanWrite => true;\n    public override long Length => throw new NotSupportedException();\n    public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();\n    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();\n    public override void SetLength(long value) => throw new NotSupportedException();\n\n    public override long Position\n    {\n        get => _TotalPosition;\n        set => throw new NotSupportedException();\n    }\n\n    #endregion\n}\n\nclass CsvRfc4180SplittingWriteStream : SplittingWriteStream\n{\n    public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n        : base(createStream, cutAfterPosition)\n    {\n    }\n\n    bool inQuotedString;\n    bool lastWasQuote;\n    protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n    {\n        int? cutPoint = null;\n        for (int n = 0; n < count; n++)\n        {\n            var i = n + offset;\n            StepState(buffer[i]);\n\n            // check for CRLF if desired and not escaped\n            if (getCutPoint && !inQuotedString && cutPoint == null\n                && buffer[i] == \'\\r\' && n + 1 < count && buffer[i + 1] == \'\\n\')\n            {\n                cutPoint = n;\n            }\n        }\n\n        return cutPoint ?? -1;\n    }\n\n    private void StepState(byte v)\n    {\n        var isQuote = v == \'"\';\n        if (lastWasQuote)\n        {\n            lastWasQuote = false;\n\n            if (isQuote)\n            {\n                // Double quotes:\n                //  nop\n                //  Inside quoted string == literal escape\n                //  Outside quoted string == empty string\n            }\n            else\n            {\n                // quote with non-quote following == toggle quoted string\n                inQuotedString ^= true;\n            }\n        }\n        else\n        {\n            lastWasQuote = isQuote;\n        }\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n