Pav*_* Zv 3 sql-server service-broker
情况:我使用Service Broker开发数据推送方式。
现在我考虑一个场景:
在Broker数据库中安装 Service Broker 的脚本:
-- installation
use master
go
if exists ( select * from sys.databases where name = 'Broker' )
begin
alter database [Broker] set restricted_user with rollback immediate;
drop database [Broker];
end
go
create database [Broker]
go
alter database [Broker] set enable_broker with rollback immediate;
alter database [Broker] set read_committed_snapshot on;
alter database [Broker] set allow_snapshot_isolation on;
alter database [Broker] set recovery full;
go
use [Broker]
go
create message type datachanges_messagetype
validation = none;
go
create contract datachanges_contract ( datachanges_messagetype sent by initiator );
go
create queue dbo.datachanges_initiatorqueue
with status = on
, retention = off
, poison_message_handling ( status = on )
on [default];
go
create queue dbo.datachanges_targetqueue
with status = on
, retention = off
, poison_message_handling ( status = on )
on [default];
go
create service datachanges_initiatorservice
on queue datachanges_initiatorqueue
( datachanges_contract );
go
create service datachanges_targetservice
on queue datachanges_targetqueue
( datachanges_contract );
go
-- conversation additional table
create table dbo.[SessionConversationsSPID] (
spid int not null
, handle uniqueidentifier not null
, primary key ( spid )
, unique ( handle )
)
go
-- SP which is used to send data from triggers
create procedure dbo.trackChanges_send
@json nvarchar(max)
as
begin
set nocount on;
if ( @json is null or @json = '' )
begin
raiserror( 'DWH Service Broker: An attempt to send empty message occurred', 16, 1);
return;
end
declare @handle uniqueidentifier = null
, @counter int = 1
, @error int;
begin transaction
while ( 1 = 1 )
begin
select @handle = handle
from dbo.[SessionConversationsSPID]
where spid = @@SPID;
if @handle is null
begin
begin dialog conversation @handle
from service datachanges_initiatorservice
to service 'datachanges_targetservice'
on contract datachanges_contract
with encryption = off;
insert into dbo.[SessionConversationsSPID] ( spid, handle )
values ( @@SPID, @handle );
end;
send on conversation @handle
message type datachanges_messagetype( @json );
set @error = @@error;
if @error = 0
break;
set @counter += 1;
if @counter > 5
begin
declare @mes varchar(max) = 'db - ' + @db + '. schema - ' + @sch;
raiserror( N'DWH Service Broker: Failed to SEND on a conversation for more than 10 times. Source: %s. Error: %i.', 16, 2, @mes, @error );
break;
end
delete from dbo.[SessionConversationsSPID]
where handle = @handle;
set @handle = null;
end
commit;
end
go
-- And dialogs creation to mitigate hot spot problem on sys.sysdesend table.
-- Described here: https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008/dd576261
declare @i int, @spid int, @handle uniqueidentifier
select @i = 0, @spid = 50;
while (@i < 150*3000) -- 450000 dialogs
begin
set @i = @i + 1
begin dialog @handle
from service datachanges_initiatorservice
to service 'datachanges_targetservice'
on contract datachanges_contract
with encryption = off;
if ((@i % 150) = 0)
begin
set @spid += 1;
insert into dbo.SessionConversationsSPID ( spid, handle ) values (@spid, @handle)
end
end
Run Code Online (Sandbox Code Playgroud)
用户数据库中的典型触发器代码:
create trigger [<SCHEMA>].[<TABLE>_TR_I]
on [<SCHEMA>].[<TABLE>]
with execute as caller
after insert
as
begin
set xact_abort off;
set nocount on;
declare @rc int = ( select count(*) from inserted );
if ( @rc = 0 )
begin
return;
end
begin try
declare @db_name sysname = db_name();
declare @json nvarchar(max);
set @json = (
select getutcdate() as get_date, ''I'' as tr_operation, current_transaction_id() as cur_tran_id, ''<TABLE>'' as table_name, @@servername as server_name, @db_name as db_name, ''<SCHEMA>'' as tenant_schemaname
, *
from inserted
for json auto, include_null_values
);
exec dbo.trackChanges_send
@json = @json;
end try
begin catch
declare @error_message nvarchar(max);
set @error_message = ''['' + isnull( cast( error_number() as nvarchar( max ) ), '''' ) +''] ''
+ isnull( cast( error_severity() as nvarchar( max ) ), '''' )
+'' State: ''+ isnull( cast( error_state() as nvarchar( max ) ), '''' )
+'' Trigger: '' + ''[<SCHEMA>].[<TABLE>_TR_I]''
+'' Line: '' + isnull( cast( error_line() as nvarchar( max ) ), '''' )
+'' Msg: '' + isnull( cast( error_message() as nvarchar( max ) ), '''' );
raiserror( ''DWH Service Broker: An error has been occured while sending data changes. Error: %s'', 0, 0, @error_message ) with log;
return;
end catch
end
go
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是:
使用DBCC页我看到这个页面属于sys.queue_messages_597577167这是一个包装dbo.datachanges_targetqueue。那一刻的等待会话总数约为 450,因此它可能是一个瓶颈。
在那段时间触发器执行的时间很长(超过 10 秒,通常不到 1 秒)。它发生在随机时间,所以我在这里看不到任何依赖
declare @i int = 0;
while ( 1 = 1 )
begin
declare @mb varbinary( max );
receive top ( 1000 ) @mb = message_body from dbo.datachanges_targetqueue
set @i = @@rowcount;
if @i = 0
break;
end
Run Code Online (Sandbox Code Playgroud)
由于触发器活动,定期执行也可能被阻止。我不明白为什么。
可以使用一个队列和约 800000 个触发器吗?:) 我的意思是也许我需要考虑一些门槛。
使用“我的”方法(一个数据库是一个发送者和一个目标)或使用“每个数据库都是发送者和一个目标”的优点/缺点是什么?
可以使用一个队列和约 800000 个触发器吗?:) 我的意思是也许我需要考虑一些门槛。
不,不是。您必须确保您的触发器始终是短期运行的,否则您的吞吐量将受到影响。
将 800000 个触发器写入单个队列并不是一个好主意。队列由常规表支持,在某种程度上,页面热点将成为您的瓶颈。和:
发送到同一数据库引擎实例中的服务的消息直接放入与这些服务关联的队列中。
如果您的目标服务位于远程 SQL Server 实例上,则消息将被写入并提交到每个数据库的传输队列。但是对于同一实例上的目标队列,消息直接进入目标队列。
我认为最重要的是直接写入目标队列在这里不是正确的解决方案。想象一下在峰值事务吞吐量时有一个空的目标队列。该队列的后备表根本没有足够的页面来展开页面锁存以容纳此场景中所需的大量并发写入器。
如果你所有的表都在同一个数据库中,那么传输队列可能成为瓶颈。但是传输队列的结构与普通队列不同。传输队列有一个聚集索引:
select i.name index_name, i.type, c.name, c.column_id, t.name type_name, c.max_length, ic.key_ordinal
from
sys.indexes i
join sys.index_columns ic
on ic.object_id = i.object_id
join sys.columns c
on c.object_id = ic.object_id
and c.column_id = ic.column_id
join sys.types t
on t.system_type_id = c.system_type_id
and t.user_type_id =c.user_type_id
where c.object_id = object_id('sys.sysxmitqueue')
Run Code Online (Sandbox Code Playgroud)
输出
index_name type name column_id type_name max_length key_ordinal
----------- ---- ------------ ----------- --------------------- ---------- -----------
clst 1 dlgid 1 uniqueidentifier 16 1
clst 1 finitiator 2 bit 1 2
clst 1 msgseqnum 8 bigint 8 3
Run Code Online (Sandbox Code Playgroud)
因此,您不会在传输队列中出现热页争用,并且您将拥有与对话对话 (dlgid) 一样多的插入点。
一个普通队列有两个索引,一个聚集索引
(状态,conversation_group_id,优先级,conversation_handle,queuing_order)
和一个非聚集索引
(状态、优先级、queuing_order、conversation_group_id、conversation_handle、service_id)
您可以通过此查询看到
select q.name queue_name, i.name index_name, i.index_id, ic.index_id, i.type, c.name column_name, c.column_id, t.name type_name, c.max_length, ic.key_ordinal
from
SYS.SERVICE_QUEUES q
join sys.internal_tables it
ON it.parent_object_id = q.object_id
join sys.indexes i
on i.object_id = it.object_id
join sys.index_columns ic
on ic.object_id = i.object_id
and ic.index_id = i.index_id
join sys.columns c
on c.object_id = ic.object_id
and c.column_id = ic.column_id
join sys.types t
on t.system_type_id = c.system_type_id
and t.user_type_id =c.user_type_id
order by q.object_id, i.index_id, ic.key_ordinal
Run Code Online (Sandbox Code Playgroud)
因此,最好将目标服务移动到远程 SQL 实例。这将卸载和写入和读取目标队列,并且可能会减少瓶颈。您的触发器只需要将消息放在传输队列上,这就是您最初认为正在发生的事情。
您可以使用扩展事件会话来观察路由和传输队列的使用情况,例如:
CREATE EVENT SESSION [ServiceBrokerRouting] ON SERVER
ADD EVENT sqlserver.broker_dialog_transmission_body_dequeue,
ADD EVENT sqlserver.broker_dialog_transmission_queue_enqueue,
ADD EVENT sqlserver.broker_forwarded_message_sent,
ADD EVENT sqlserver.broker_message_classify,
ADD EVENT sqlserver.broker_remote_message_acknowledgement
Run Code Online (Sandbox Code Playgroud)
同样在您当前的设计和远程服务选项中,您可以从索引结构中看到重用正确号码的对话对话如何优化解决方案。太少了,您会遇到锁定和页面争用问题。太多了,你有创建和管理它们的开销,你不能做消息批处理。看起来您已经阅读了 Reusing Conversations,并且正在使用每个会话的对话模式,Remus 推荐用于此模式。查看页面闩锁争用在哪个索引上以及它是叶页还是非叶页会很有趣。但无论如何,具有并发 SEND 和 RECEIVE 的队列表通常没有足够的页面来分散页面闩锁争用。
因此,设计替代方案是让触发器删除 N 个中间队列上的更改,然后在将消息转发到单个目标队列的那些上设置激活过程。您可能仍然在目标队列上等待,但它们不会在您的触发器期间。此外,在您的中间到最终队列激活过程中,您可以批量发送和管理对话,并减少对话对话 (N),因此接收方实际上可以每次调用获取 1000 条消息。对 RECEIVE 的单个调用只能从单个对话中获取消息。因此,如果您有数千个交错的对话,您将始终只获取单行。
或者干脆有 N 个目标队列,让你的读者阅读所有这些队列。
没有什么根本原因不能让它工作,但这不会很简单。规模巨大,Service Broker 很复杂。您还应该在这里考虑第 3 方解决方案。 Qlik (Attunity)有一个基于日志的 SQL Server CDC 解决方案,可以从 SQL Server 事务日志中收集所有更改,无需触发器或队列。还有其他几种基于SQL Server Change Data Capture 的解决方案。Change Data Capture 将为您暂存所有更改,您只需从外部程序中使用它们。 更改跟踪是最轻量级的解决方案,但不捕获行的中间版本。因此,您知道哪些行发生了更改以及更改是插入、更新还是删除,但您只有要查询的行的当前版本。但是这些选项中的每一个都将是昂贵的、棘手的,并且需要大量测试。