Reb*_*oon 9 c# sql t-sql sql-server performance
我从外部源(通过Lightstreamer)接收(流式传输)数据到我的C#应用程序.我的C#应用程序从侦听器接收数据.来自侦听器的数据存储在队列(ConcurrentQueue)中.使用TryDequeue将队列每0.5秒清理一次到DataTable中.然后使用SqlBulkCopy将DataTable复制到SQL数据库中.SQL数据库将从临时表到达的新数据处理到最终表中.我目前每天收到大约300'000行(可以在接下来的几周内强烈增加),我的目标是从收到数据之前保持不到1秒,直到它们在最终的SQL表中可用.目前,我必须处理的每秒最大行数约为50行.
不幸的是,由于接收到越来越多的数据,我的逻辑性能变得越来越慢(仍然远远低于1秒,但我想继续改进).主要瓶颈(到目前为止)是将登台数据(在SQL数据库上)处理到最终表中.为了提高性能,我想将登台表切换到内存优化表.最终表已经是一个内存优化表,因此它们可以很好地协同工作.
我的问题:
编辑(带解决方案):
在评论/答案和性能评估之后,我决定放弃批量插入并使用SQLCommand将IEnumerable与我的数据作为表值参数切换到本机编译的存储过程,以将数据直接存储在我的内存优化的最终表中(以及复制到"staging"表,现在用作存档).性能显着提高(即使我没有考虑并行插入插件(将在稍后阶段)).
以下是代码的一部分:
内存优化的用户定义表类型(将数据从C#切换到SQL(存储过程):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH ( MEMORY_OPTIMIZED = ON )
Run Code Online (Sandbox Code Playgroud)
本机编译的存储过程,用于将数据插入到最终表和暂存(在本例中用作存档):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')
-- store prices
insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID
-- store data in staging table
insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices
end
Run Code Online (Sandbox Code Playgroud)
IEnumerable填充了队列:
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{
// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)
SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;
MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
// define sql data record with the columns
SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });
// remove each price row from queue and add it to the sql data record
LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();
while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{
DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price
yield return DataRecord;
}
}
Run Code Online (Sandbox Code Playgroud)
每0.5秒处理一次数据:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{
try
{
// open new sql connection
using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{
// open connection
TimeSeriesDatabaseSQLConnection.Open();
// endless loop to keep thread alive
while(true)
{
// ensure queue has rows to process (otherwise no need to continue)
if(IntradayQuotesQueue.Count > 0)
{
// define stored procedure for sql command
SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);
// set command type to stored procedure
InsertCommand.CommandType = CommandType.StoredProcedure;
// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())
SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter
// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)
ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;
// execute stored procedure
InsertCommand.ExecuteNonQuery();
}
// wait 0.5 seconds
Thread.Sleep(500);
}
}
}
catch (Exception e)
{
// handle error (standard error messages and update processing)
ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);
};
}
Run Code Online (Sandbox Code Playgroud)
使用 SQL Server 2016(它还不是 RTM,但在内存优化表方面已经比 2014 年好得多)。然后使用内存优化的表变量,或者只是在事务中调用大量本机存储过程调用,每个调用执行一次插入,具体取决于您的场景中更快的速度(这会有所不同)。需要注意以下几点:
async/await和/或Parallel.ForEach.DataTable,但这不是最有效的方法 - 这将传递IEnumerable<SqlDataRecord>. 您可以使用迭代器方法来生成值,因此仅分配恒定量的内存。您必须进行一些实验才能找到传递数据的最佳方式;这在很大程度上取决于数据的大小以及获取数据的方式。