sun*_*nny 3 c# sql-server bcp sqldatareader csvhelper
我开发了一个 CSV 批处理编写器。但与 BCP 相比,这个过程似乎相当缓慢。我唯一的要求是将没有标识或主键列的大表导出到多个小型 CSV 文件中,并使用相应的批处理 ID 命名它们。
BCP 的问题是它只会写入单个大文件。
我当前的流程所做的是:读取数据并使用 CSV 编写器写入内存流我不断检查内存流是否大于特定的批处理大小,然后我将异步复制内存流写入文本文件。
如果没有内存不足的异常,我可以导出批量大小为 250MB 的文件
但这个过程比 BCP 导出要多花 5 倍的时间。
有没有比我正在做的更好的方法来实现批量导出到 CSV。
请指教。
Mit*_*tch 10
我想到了几个选择:
\n如果源查询能够在 SQL Server 中轻松地进行批处理(例如,可以关闭的聚集索引),则 FETCH 和 OFFSET 基本上是免费的。
\n如果表是堆,FETCH/OFFSET 并不是一个真正的选择,但您可能会考虑添加聚集索引,因为没有太多好的理由反对这样做(尽管对于 100 GB 的表来说这样做会很昂贵:)
\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w\nbcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w\nRun Code Online (Sandbox Code Playgroud)\n根据使用约 1.2 GB 的表进行的测量,C# CSV SQL 导出的 na\xc3\xafve 实现(如下)在同一表和系统上实现了 BCP 75% 的性能。(它还有一个好处是可以正确处理有关嵌入逗号、引号和 CRLF 的CSV 格式)。
\nstatic void Main(string[] args)\n{\n var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");\n con.Open();\n\n var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);\n\n using (var reader = sqr.ExecuteReader())\n using (var tw = File.CreateText("out.csv"))\n {\n while (reader.Read())\n {\n for (int i = 0; i < reader.FieldCount; i++)\n {\n if (i != 0)\n {\n tw.Write(\',\');\n }\n\n var val = FormatValue(reader[i]);\n if (val == null)\n {\n // no-op\n }\n else if (val.IndexOfAny(new[] { \'"\', \',\', \'\\r\', \'\\n\' }) >= 0)\n {\n tw.Write(\'"\');\n tw.Write(val.Replace("\\"", "\\"\\""));\n tw.Write(\'"\');\n }\n else\n {\n tw.Write(val);\n }\n }\n tw.Write("\\r\\n");\n }\n }\n}\n\nprivate static string FormatValue(object v)\n{\n if (v == null)\n {\n return null;\n }\n if (v is DateTime dt)\n {\n return dt.ToString("O");\n }\n if (v is DateTimeOffset dto)\n {\n return dto.ToString("O");\n }\n if (v is byte[] ba)\n {\n var sb = new StringBuilder(2 + ba.Length * 2);\n sb.Append("0x");\n for (int i = 0; i < ba.Length; i++)\n {\n sb.Append(ba[i].ToString("X2"));\n }\n return sb.ToString();\n }\n return v.ToString();\n}\nRun Code Online (Sandbox Code Playgroud)\n性能似乎受到 GC 处理如此多字符串分配的限制 - 因此,如果需要更高的性能,则将相同的内容转换为非 CLR 语言(例如 C++)可能会与 BCP 的性能相匹配。
\nSSIS 可以在一个包中执行所有步骤。确切的步骤可能最好留给另一个问题,但基本上相当于合成“文件编号”列并使用平面文件目标。 这方面的坏例子
\n如果您使用 SSIS(直接使用或使用导出数据向导),您将获得一个可以拆分的符合 RFC 4180 的 CSV 文件。分割此类文件的示例工具是:
\nclass Program\n{\n static void Main(string[] args)\n {\n int n = 0;\n using (var src = File.OpenRead("rfc4180_in.csv"))\n using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))\n {\n src.CopyTo(dst);\n }\n }\n}\n\n/// <summary>\n/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least \n/// cutAfterPosition bytes long.\n/// </summary>\nabstract class SplittingWriteStream : Stream\n{\n private long _TotalPosition;\n private long CurrentStreamPos;\n private readonly long CutAfterPosition;\n private readonly Func<Stream> StreamCtor;\n private Stream CurrentStream;\n\n public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n {\n if (cutAfterPosition < 0L)\n {\n throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));\n }\n this.CutAfterPosition = cutAfterPosition;\n\n this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));\n this.CurrentStream = createStream();\n }\n\n protected override void Dispose(bool disposing) => CurrentStream.Dispose();\n\n public override void Flush() => CurrentStream.Flush();\n\n public override void Write(byte[] buffer, int offset, int count)\n {\n // ignore count to always exceed cutAfterPosition\n var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);\n if (cutPoint < 0)\n {\n CurrentStream.Write(buffer, offset, count);\n }\n else\n {\n if (cutPoint > 0)\n {\n CurrentStream.Write(buffer, offset, cutPoint);\n }\n\n try\n {\n CurrentStream.Dispose();\n }\n finally\n {\n CurrentStream = null;\n CurrentStreamPos = 0L;\n CurrentStream = StreamCtor();\n }\n\n if (cutPoint != count)\n {\n CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);\n }\n }\n\n CurrentStreamPos += count;\n _TotalPosition += count;\n }\n\n protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);\n\n #region Stream Write-only stubs\n\n public override bool CanRead => false;\n public override bool CanSeek => false;\n public override bool CanWrite => true;\n public override long Length => throw new NotSupportedException();\n public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();\n public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();\n public override void SetLength(long value) => throw new NotSupportedException();\n\n public override long Position\n {\n get => _TotalPosition;\n set => throw new NotSupportedException();\n }\n\n #endregion\n}\n\nclass CsvRfc4180SplittingWriteStream : SplittingWriteStream\n{\n public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n : base(createStream, cutAfterPosition)\n {\n }\n\n bool inQuotedString;\n bool lastWasQuote;\n protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n {\n int? cutPoint = null;\n for (int n = 0; n < count; n++)\n {\n var i = n + offset;\n StepState(buffer[i]);\n\n // check for CRLF if desired and not escaped\n if (getCutPoint && !inQuotedString && cutPoint == null\n && buffer[i] == \'\\r\' && n + 1 < count && buffer[i + 1] == \'\\n\')\n {\n cutPoint = n;\n }\n }\n\n return cutPoint ?? -1;\n }\n\n private void StepState(byte v)\n {\n var isQuote = v == \'"\';\n if (lastWasQuote)\n {\n lastWasQuote = false;\n\n if (isQuote)\n {\n // Double quotes:\n // nop\n // Inside quoted string == literal escape\n // Outside quoted string == empty string\n }\n else\n {\n // quote with non-quote following == toggle quoted string\n inQuotedString ^= true;\n }\n }\n else\n {\n lastWasQuote = isQuote;\n }\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n如果需要 BCP,并且它对 CSV 的(不良)处理是可以容忍的,那么它可以写入命名管道流以进行动态拆分。
\nclass Program\n{\n static void Main(string[] args)\n {\n Thread copyThread;\n var pipeId = $"bcp_{Guid.NewGuid():n}";\n // bcp requires read/write pipe\n using (var np = new NamedPipeServerStream(pipeId))\n {\n copyThread = new Thread(_1 =>\n {\n np.WaitForConnection();\n int n = 0;\n // Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)\n // Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)\n using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))\n {\n np.CopyTo(dst);\n }\n });\n copyThread.Name = "Write thread";\n copyThread.IsBackground = true;\n copyThread.Start();\n\n var bcp = Process.Start(\n @"C:\\Program Files\\Microsoft SQL Server\\Client SDK\\ODBC\\170\\Tools\\Binn\\bcp.exe",\n $@"FWDB.Rx.RxBatches out \\\\.\\pipe\\{pipeId} -S (local) -U sa -P abc -w -t,");\n bcp.WaitForExit();\n }\n copyThread.Join();\n }\n}\n\nclass CrlfUtf16leSplittingWriteStream : SplittingWriteStream\n{\n public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n : base(createStream, cutAfterPosition)\n {\n }\n\n protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n {\n if (getCutPoint)\n {\n for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)\n {\n var i = n + offset;\n if (buffer[i] == \'\\r\' && buffer[i + 1] == 0\n && buffer[i + 2] == \'\\n\' && buffer[i + 3] == 0)\n {\n // split after CRLF\n return n + 4;\n }\n }\n }\n\n return -1;\n }\n}\n\nclass CrlfUtf8SplittingWriteStream : SplittingWriteStream\n{\n public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n : base(createStream, cutAfterPosition)\n {\n }\n\n protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n {\n if (getCutPoint)\n {\n for (int n = 0; n < count - 1 /* CR LF */; n++)\n {\n var i = n + offset;\n if (buffer[i] == \'\\r\' && buffer[i + 1] == \'\\n\')\n {\n // split after CRLF\n return n + 2;\n }\n }\n }\n\n return -1;\n }\n}\n\n/// <summary>\n/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least \n/// cutAfterPosition bytes long.\n/// </summary>\nabstract class SplittingWriteStream : Stream\n{\n private long _TotalPosition;\n private long CurrentStreamPos;\n private readonly long CutAfterPosition;\n private readonly Func<Stream> StreamCtor;\n private Stream CurrentStream;\n\n public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n {\n if (cutAfterPosition < 0L)\n {\n throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));\n }\n this.CutAfterPosition = cutAfterPosition;\n\n this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));\n this.CurrentStream = createStream();\n }\n\n protected override void Dispose(bool disposing) => CurrentStream.Dispose();\n\n public override void Flush() => CurrentStream.Flush();\n\n public override void Write(byte[] buffer, int offset, int count)\n {\n // ignore count to always exceed cutAfterPosition\n var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);\n if (cutPoint < 0)\n {\n CurrentStream.Write(buffer, offset, count);\n }\n else\n {\n if (cutPoint > 0)\n {\n CurrentStream.Write(buffer, offset, cutPoint);\n }\n\n try\n {\n CurrentStream.Dispose();\n }\n finally\n {\n CurrentStream = null;\n CurrentStreamPos = 0L;\n CurrentStream = StreamCtor();\n }\n\n if (cutPoint != count)\n {\n CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);\n }\n }\n\n CurrentStreamPos += count;\n _TotalPosition += count;\n }\n\n protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);\n\n #region Stream Write-only stubs\n\n public override bool CanRead => false;\n public override bool CanSeek => false;\n public override bool CanWrite => true;\n public override long Length => throw new NotSupportedException();\n public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();\n public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();\n public override void SetLength(long value) => throw new NotSupportedException();\n\n public override long Position\n {\n get => _TotalPosition;\n set => throw new NotSupportedException();\n }\n\n #endregion\n}\n\nclass CsvRfc4180SplittingWriteStream : SplittingWriteStream\n{\n public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)\n : base(createStream, cutAfterPosition)\n {\n }\n\n bool inQuotedString;\n bool lastWasQuote;\n protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)\n {\n int? cutPoint = null;\n for (int n = 0; n < count; n++)\n {\n var i = n + offset;\n StepState(buffer[i]);\n\n // check for CRLF if desired and not escaped\n if (getCutPoint && !inQuotedString && cutPoint == null\n && buffer[i] == \'\\r\' && n + 1 < count && buffer[i + 1] == \'\\n\')\n {\n cutPoint = n;\n }\n }\n\n return cutPoint ?? -1;\n }\n\n private void StepState(byte v)\n {\n var isQuote = v == \'"\';\n if (lastWasQuote)\n {\n lastWasQuote = false;\n\n if (isQuote)\n {\n // Double quotes:\n // nop\n // Inside quoted string == literal escape\n // Outside quoted string == empty string\n }\n else\n {\n // quote with non-quote following == toggle quoted string\n inQuotedString ^= true;\n }\n }\n else\n {\n lastWasQuote = isQuote;\n }\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n
| 归档时间: |
|
| 查看次数: |
2385 次 |
| 最近记录: |