将数据保存在内存中

Dar*_*der 5 .net c# memory collections multithreading

我有一个http处理程序,我将每个请求存储到内存中的并发队列集合.经过一段时间后,我将集合大量插入数据库.

这是一个坏主意吗?因为数量很大,这似乎是IMO更好的方法.

我确实看到一些差异(命中数与数据库中存储元素的数量),由于线程,而我正在刷新并发集合,我锁定它并批量插入其内容,然后清空集合.然后从集合中删除锁.

有更好的做法吗?或者你做过类似的事情?

Mik*_*han 1

我已经做了与您用下面的代码描述的几乎完全相同的事情。它是线程安全的,并且有一个刷新方法,您可以调用它来刷新和挂起的写入。一旦达到要写入的对象的阈值数量,它就会将队列(在我的例子中为 List)发送到不同的线程进行保存。请注意,它使用 ManualResetEvent 来处理最后刷新数据(您可以等待的重置事件的限制为 64 个,因此,如果我们有超过 64 个后台线程等待写入,则需要手动等待,但这应该除非你的数据库真的很慢,否则几乎不会发生)。该代码用于处理流入其中的数千万条记录(从内存中写入 2000 万行大约需要 5 分钟,但在将服务器保存为数据库上运行,因此没有网络跃点...SQL 当然可以处理数千条记录使用 BulkSqlCopy 对象和 IDataReader 每秒行数),因此它应该处理您的请求负载(但这当然取决于您正在编写的内容和数据库,但我认为代码可以完成任务!)。

另外,为了便于批量写入,我创建了 IDataReader 的最小实现来流式传输数据。您需要按照您的要求执行此操作才能使用下面的代码。

public class DataImporter<T>
{

    public DataImporter(string tableName, string readerName)
    {
        _tableName = tableName;
        _readerName = readerName;
    }

    /// <summary>
    /// This is the size of our bulk staging list.
    /// </summary>
    /// <remarks>
    /// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value,
    /// so records may not be going into the database in sizes of this staging value.
    /// </remarks>
    private int _bulkStagingListSize = 20000;
    private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>();
    private string _tableName = String.Empty;
    private string _readerName = String.Empty;

    public void QueueForImport(T record)
    {
        lock (_listLock)
        {
            _items.Add(record);
            if (_items.Count > _bulkStagingListSize)
            {
                SaveItems(_items);
                _items = new List<T>();
            }
        }
    }

    /// <summary>
    /// This method should be called at the end of the queueing work to ensure to clear down our list
    /// </summary>
    public void Flush()
    {
        lock (_listLock)
        {
            SaveItems(_items);
            _items = new List<T>();
            while (_tasksWaiting.Count > 64)
            {
                Thread.Sleep(2000);
            }
            WaitHandle.WaitAll(_tasksWaiting.ToArray());
        }
    }

    private void SaveItems(List<T> items)
    {
        ManualResetEvent evt = new ManualResetEvent(false);
        _tasksWaiting.Add(evt);
        IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items);
        Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader);
        ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo);

    }

    private void saveData(object info)
    {
        using (new ActivityTimer("Saving bulk data to " + _tableName))
        {
            Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>;
            IDataReader r = stateInfo.Item2;
            try
            {
                Database.DataImportStagingDatabase.BulkLoadData(r, _tableName);
            }
            catch (Exception ex)
            {
                //Do something
            }
            finally
            {
                _tasksWaiting.Remove(stateInfo.Item1);
                stateInfo.Item1.Set();
            }
        }
    }

    private object _listLock = new object();

    private List<T> _items = new List<T>();
}
Run Code Online (Sandbox Code Playgroud)

下面提到的 DataReaderFactory 只是选择正确的 IDataReader 实现来用于流式传输,如下所示:

internal static class DataReaderFactory
{
    internal static IDataReader GetReader<T>(string typeName, List<T> items)
    {
        IDataReader reader = null;
        switch(typeName)
        {
            case "ProductRecordDataReader":
                reader =  new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader;
                break;
            case "RetailerPriceRecordDataReader":
                reader =  new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader;
                break;
            default:
                break;
        }
        return reader;
    }
}
Run Code Online (Sandbox Code Playgroud)

我在本例中使用的数据读取器实现(尽管此代码适用于任何数据读取器)如下:

/// <summary>
/// This class creates a data reader for ProductRecord data.  This is used to stream the records
/// to the SqlBulkCopy object.
/// </summary>
public class ProductRecordDataReader:IDataReader
{
    public ProductRecordDataReader(List<ProductRecord> products)
    {
        _products = products.ToList();
    }

    List<ProductRecord> _products;

    int currentRow;
    int rowCounter = 0;
    public int FieldCount
    {
        get
        {
            return 14;
        }
    }


    #region IDataReader Members

    public void Close()
    {
        //Do nothing.
    }

    public bool Read()
    {
        if (rowCounter < _products.Count)
        {
            currentRow = rowCounter;
            rowCounter++;
            return true;
        }
        else
        {
            return false;
        }

    }

    public int RecordsAffected
    {
        get { throw new NotImplementedException(); }
    }

    public string GetName(int i)
    {
        switch (i)
        {
            case 0:
                return "ProductSku";
            case 1:
                return "UPC";
            case 2:
                return "EAN";
            case 3:
                return "ISBN";
            case 4:
                return "ProductName";
            case 5:
                return "ShortDescription";
            case 6:
                return "LongDescription";
            case 7:
                return "DFFCategoryNumber";
            case 8:
                return "DFFManufacturerNumber";
            case 9:
                return "ManufacturerPartNumber";
            case 10:
                return "ManufacturerModelNumber";
            case 11:
                return "ProductImageUrl";
            case 12:
                return "LowestPrice";
            case 13:
                return "HighestPrice";
            default:
                return null;
        }

    }

    public int GetOrdinal(string name)
    {
        switch (name)
        {
            case "ProductSku":
                return 0;
            case "UPC":
                return 1;
            case "EAN":
                return 2;
            case "ISBN":
                return 3;
            case "ProductName":
                return 4;
            case "ShortDescription":
                return 5;
            case "LongDescription":
                return 6;
            case "DFFCategoryNumber":
                return 7;
            case "DFFManufacturerNumber":
                return 8;
            case "ManufacturerPartNumber":
                return 9;
            case "ManufacturerModelNumber":
                return 10;
            case "ProductImageUrl":
                return 11;
            case "LowestPrice":
                return 12;
            case "HighestPrice":
                return 13;
            default:
                return -1;
        }

    }

    public object GetValue(int i)
    {
        switch (i)
        {
            case 0:
                return _products[currentRow].ProductSku;
            case 1:
                return _products[currentRow].UPC;
            case 2:
                return _products[currentRow].EAN;
            case 3:
                return _products[currentRow].ISBN;
            case 4:
                return _products[currentRow].ProductName;
            case 5:
                return _products[currentRow].ShortDescription;
            case 6:
                return _products[currentRow].LongDescription;
            case 7:
                return _products[currentRow].DFFCategoryNumber;
            case 8:
                return _products[currentRow].DFFManufacturerNumber;
            case 9:
                return _products[currentRow].ManufacturerPartNumber;
            case 10:
                return _products[currentRow].ManufacturerModelNumber;
            case 11:
                return _products[currentRow].ProductImageUrl;
            case 12:
                return _products[currentRow].LowestPrice;
            case 13:
                return _products[currentRow].HighestPrice;
            default:
                return null;
        }

    }

    #endregion

    #region IDisposable Members

    public void Dispose()
    {
        //Do nothing;
    }

    #endregion

    #region IDataRecord Members

    public bool NextResult()
    {
        throw new NotImplementedException();
    }

    public int Depth
    {
        get { throw new NotImplementedException(); }
    }

    public DataTable GetSchemaTable()
    {
        throw new NotImplementedException();
    }

    public bool IsClosed
    {
        get { throw new NotImplementedException(); }
    }

    public bool GetBoolean(int i)
    {
        throw new NotImplementedException();
    }

    public byte GetByte(int i)
    {
        throw new NotImplementedException();
    }

    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
    {
        throw new NotImplementedException();
    }

    public char GetChar(int i)
    {
        throw new NotImplementedException();
    }

    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
    {
        throw new NotImplementedException();
    }

    public IDataReader GetData(int i)
    {
        throw new NotImplementedException();
    }

    public string GetDataTypeName(int i)
    {
        throw new NotImplementedException();
    }

    public DateTime GetDateTime(int i)
    {
        throw new NotImplementedException();
    }

    public decimal GetDecimal(int i)
    {
        throw new NotImplementedException();
    }

    public double GetDouble(int i)
    {
        throw new NotImplementedException();
    }

    public Type GetFieldType(int i)
    {
        throw new NotImplementedException();
    }

    public float GetFloat(int i)
    {
        throw new NotImplementedException();
    }

    public Guid GetGuid(int i)
    {
        throw new NotImplementedException();
    }

    public short GetInt16(int i)
    {
        throw new NotImplementedException();
    }

    public int GetInt32(int i)
    {
        throw new NotImplementedException();
    }

    public long GetInt64(int i)
    {
        throw new NotImplementedException();
    }

    public string GetString(int i)
    {
        throw new NotImplementedException();
    }

    public int GetValues(object[] values)
    {
        throw new NotImplementedException();
    }

    public bool IsDBNull(int i)
    {
        throw new NotImplementedException();
    }

    public object this[string name]
    {
        get { throw new NotImplementedException(); }
    }

    public object this[int i]
    {
        get { throw new NotImplementedException(); }
    }

    #endregion
}
Run Code Online (Sandbox Code Playgroud)

最后批量加载数据方法如下:

    public void BulkLoadData(IDataReader reader, string tableName)
    {
        using (SqlConnection cnn = new SqlConnection(cnnString))
        {
            SqlBulkCopy copy = new SqlBulkCopy(cnn);
            copy.DestinationTableName = tableName;
            copy.BatchSize = 10000;
            cnn.Open();
            copy.WriteToServer(reader);
        }
    }
Run Code Online (Sandbox Code Playgroud)

然而,话虽如此,我建议您不要在 asp.net 中使用此代码,因为有人在另一个答案中指出了原因(特别是 IIS 中工作进程的回收)。我建议您使用一个非常轻量级的队列,首先将请求数据发送到另一个不会重新启动的服务(我们使用 ZeroMQ 从我正在编写的 ASP.NET 应用程序中传输请求和记录数据......非常高效)。

麦克风。