使用CLR和GZIP压缩行集

got*_*tqn 8 .net c# t-sql sqlclr sql-server-2012

我想压缩一些包含很少或根本不读取的历史数据的大型表.我已经第一次尝试使用内置的压缩(row,page,column stored,column-stored archive),但他们都没有可以压缩外的行值(varchar(max),nvarchar(max)),最后终会试着用CLR解决方案.

SQL服务器压缩的行集样品液压缩通过使用用户定义的特定查询返回的整个行集CLR类型.

例如:

CREATE TABLE Archive
(
     [Date] DATETIME2 DEFAULT(GETUTCDATE())
    ,[Data] [dbo].[CompressedRowset]
)

INSERT INTO Archive([Data])
SELECT [dbo].[CompressQueryResults]('SELECT * FROM [dbo].[A]')
Run Code Online (Sandbox Code Playgroud)

它工作正常,但我遇到了以下问题:

  • 当我尝试压缩大型结果行集时,我收到以下错误:

    消息0,级别11,状态0,行0当前命令发生严重错误.结果(如果有的话)应该被丢弃.

    此外,以下声明正在起作用:

    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
    
    Run Code Online (Sandbox Code Playgroud)

    但这些不是:

    INSERT INTO Archive
    SELECT [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]'
    
    DECLARE @A [dbo].[CompressedRowset]
    SELECT @A = [dbo].[CompressQueryResults] ('SELECT * FROM [dbo].[LargeA]')
    
    Run Code Online (Sandbox Code Playgroud)
  • 为了压缩行集t-sql type应该映射到 .net type; 遗憾的是,对于所有sql类型都不是这样 - 映射CLR参数数据 ; 我已经扩展了以下函数来处理更多类型,但是如何处理类型geography,例如:

    static SqlDbType ToSqlType(Type t){
        if (t == typeof(int)){
            return SqlDbType.Int;
        }
    
        ...
    
        if (t == typeof(Byte[])){
            return SqlDbType.VarBinary;
        } else {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)

这是整体 .net代码:

using System;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
using Microsoft.SqlServer.Server;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO.Compression;
using System.Xml.Serialization;
using System.Xml;

[Serializable]
[Microsoft.SqlServer.Server.SqlUserDefinedType
    (
        Format.UserDefined
        ,IsByteOrdered = false
        ,IsFixedLength = false
        ,MaxByteSize = -1
    )
]
public struct CompressedRowset : INullable, IBinarySerialize, IXmlSerializable
{
    DataTable rowset;

    public DataTable Data
    {
        get { return this.rowset; }
        set { this.rowset = value; }
    }

    public override string ToString()
    {
        using (var sw = new StringWriter())
        using (var xw = new XmlTextWriter(sw))
        {
            WriteXml(xw);
            xw.Flush();
            sw.Flush();
            return sw.ToString();
        }
    }

    public bool IsNull
    {
        get { return (this.rowset == null);}
    }

    public static CompressedRowset Null
    {
        get
        {
            CompressedRowset h = new CompressedRowset();
            return h;
        }
    }

    public static CompressedRowset Parse(SqlString s)
    {
        using (var sr = new StringReader(s.Value))
        using (var xr = new XmlTextReader(sr))
        {
            var c = new CompressedRowset();
            c.ReadXml(xr);
            return c;
        }
    }


    #region "Stream Wrappers"
    abstract class WrapperStream : Stream
    {
        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {

        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                throw new NotImplementedException();
            }
            set
            {
                throw new NotImplementedException();
            }
        }


        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }


    }

    class BinaryWriterStream : WrapperStream
    {
        BinaryWriter br;
        public BinaryWriterStream(BinaryWriter br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return false; }
        }
        public override bool CanWrite
        {
            get { return true; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            br.Write(buffer, offset, count);
        }
    }

    class BinaryReaderStream : WrapperStream
    {
        BinaryReader br;
        public BinaryReaderStream(BinaryReader br)
        {
            this.br = br;
        }
        public override bool CanRead
        {
            get { return true; }
        }
        public override bool CanWrite
        {
            get { return false; }
        }
        public override int Read(byte[] buffer, int offset, int count)
        {
            return br.Read(buffer, offset, count);
        }
        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }
    }
    #endregion

    #region "IBinarySerialize"
    public void Read(System.IO.BinaryReader r)
    {
        using (var rs = new BinaryReaderStream(r))
        using (var cs = new GZipStream(rs, CompressionMode.Decompress))
        {
            var ser = new BinaryFormatter();
            this.rowset = (DataTable)ser.Deserialize(cs);
        }
    }
    public void Write(System.IO.BinaryWriter w)
    {
        if (this.IsNull)
            return;

        rowset.RemotingFormat = SerializationFormat.Binary;
        var ser = new BinaryFormatter();
        using (var binaryWriterStream = new BinaryWriterStream(w))
        using (var compressionStream = new GZipStream(binaryWriterStream, CompressionMode.Compress))
        {
            ser.Serialize(compressionStream, rowset);
        }

    }

    #endregion

    /// <summary>
    /// This procedure takes an arbitrary query, runs it and compresses the results into a varbinary(max) blob.
    /// If the query has a large result set, then this procedure will use a large amount of memory to buffer the results in 
    /// a DataTable, and more to copy it into a compressed buffer to return. 
    /// </summary>
    /// <param name="query"></param>
    /// <param name="results"></param>
    //[Microsoft.SqlServer.Server.SqlProcedure]
    [SqlFunction(DataAccess = DataAccessKind.Read, SystemDataAccess = SystemDataAccessKind.Read, IsDeterministic = false, IsPrecise = false)]
    public static CompressedRowset CompressQueryResults(string query)
    {
        //open a context connection
        using (var con = new SqlConnection("Context Connection=true"))
        {
            con.Open();
            var cmd = new SqlCommand(query, con);
            var dt = new DataTable();
            using (var rdr = cmd.ExecuteReader())
            {
                dt.Load(rdr);
            }
            //configure the DataTable for binary serialization
            dt.RemotingFormat = SerializationFormat.Binary;
            var bf = new BinaryFormatter();

            var cdt = new CompressedRowset();
            cdt.rowset = dt;
            return cdt;


        }
    }

    /// <summary>
    /// partial Type mapping between SQL and .NET
    /// </summary>
    /// <param name="t"></param>
    /// <returns></returns>
    static SqlDbType ToSqlType(Type t)
    {
        if (t == typeof(int))
        {
            return SqlDbType.Int;
        }
        if (t == typeof(string))
        {
            return SqlDbType.NVarChar;
        }
        if (t == typeof(Boolean))
        {
            return SqlDbType.Bit;
        }
        if (t == typeof(decimal))
        {
            return SqlDbType.Decimal;
        }
        if (t == typeof(float))
        {
            return SqlDbType.Real;
        }
        if (t == typeof(double))
        {
            return SqlDbType.Float;
        }
        if (t == typeof(DateTime))
        {
            return SqlDbType.DateTime;
        }
        if (t == typeof(Int64))
        {
            return SqlDbType.BigInt;
        }
        if (t == typeof(Int16))
        {
            return SqlDbType.SmallInt;
        }
        if (t == typeof(byte))
        {
            return SqlDbType.TinyInt;
        }
        if ( t == typeof(Guid))
        {
            return SqlDbType.UniqueIdentifier;
        }
        //!!!!!!!!!!!!!!!!!!!
        if (t == typeof(Byte[]))
        {
            return SqlDbType.VarBinary;
        }   
        else
        {
            throw new NotImplementedException("CLR Type " + t.Name + " Not supported for conversion");
        }

    }

    /// <summary>
    /// This stored procedure takes a compressed DataTable and returns it as a resultset to the clinet
    /// or into a table using exec .... into ...
    /// </summary>
    /// <param name="results"></param>
    [Microsoft.SqlServer.Server.SqlProcedure]
    public static void UnCompressRowset(CompressedRowset results)
    {
        if (results.IsNull)
            return;

        DataTable dt = results.rowset;
        var fields = new SqlMetaData[dt.Columns.Count];
        for (int i = 0; i < dt.Columns.Count; i++)
        {
            var col = dt.Columns[i];
            var sqlType = ToSqlType(col.DataType);
            var colName = col.ColumnName;
            if (sqlType == SqlDbType.NVarChar || sqlType == SqlDbType.VarBinary)
            {
                fields[i] = new SqlMetaData(colName, sqlType, col.MaxLength);
            }
            else
            {
                fields[i] = new SqlMetaData(colName, sqlType);
            }
        }
        var record = new SqlDataRecord(fields);

        SqlContext.Pipe.SendResultsStart(record);
        foreach (DataRow row in dt.Rows)
        {
            record.SetValues(row.ItemArray);
            SqlContext.Pipe.SendResultsRow(record);
        }
        SqlContext.Pipe.SendResultsEnd();

    }

    public System.Xml.Schema.XmlSchema GetSchema()
    {
        return null;
    }

    public void ReadXml(System.Xml.XmlReader reader)
    {
        if (rowset != null)
        {
            throw new InvalidOperationException("rowset already read");
        }
        var ser = new XmlSerializer(typeof(DataTable));
        rowset = (DataTable)ser.Deserialize(reader);
    }

    public void WriteXml(System.Xml.XmlWriter writer)
    {
        if (String.IsNullOrEmpty(rowset.TableName))
            rowset.TableName = "Rows";

        var ser = new XmlSerializer(typeof(DataTable));
        ser.Serialize(writer, rowset);
    }
}
Run Code Online (Sandbox Code Playgroud)

这是SQL对象的创建:

CREATE TYPE [dbo].[CompressedRowset]
     EXTERNAL NAME [CompressedRowset].[CompressedRowset];

GO

CREATE FUNCTION [dbo].[CompressQueryResults] (@query [nvarchar](4000))
RETURNS [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[CompressQueryResults];

GO

CREATE PROCEDURE [dbo].[UnCompressRowset] @results [dbo].[CompressedRowset]
AS EXTERNAL NAME [CompressedRowset].[CompressedRowset].[UnCompressRowset];

GO
Run Code Online (Sandbox Code Playgroud)

Dav*_*ett 1

对于最初的问题来说可能为时已晚,但这对于其他人来说可能值得考虑:在 SQL Server 2016 中,有压缩和解压缩功能(请参阅此处此处),如果您尝试归档的数据包含大量数据,则这些功能可能会很有用。[N]VARCHAR和列中的值VARBINARY

您需要将其烘焙到业务逻辑层中,或者在 SQL Server 中进行某种安排,从而将未压缩的表作为视图复制到支持表(压缩值所在的位置)上,并通过更新触发器来派生未压缩的DECOMPRESS数据INSTEAD OF。支持表(因此,除了性能差异之外,视图的行为类似于选择/插入/更新/删除的原始表)。有点 hacky,但它会起作用......

对于较旧的 SQL 版本,您也可以编写一个 CLR 函数来完成这项工作。

这种方法显然不适用于由小字段组成的数据集,当然,这种压缩方式根本不会在小值上实现任何效果(事实上,它会使它们变得更大)。