Sql Server Service Broker

Gop*_*opi 4 sql database sql-server service-broker

目前我们正在使用服务代理来回发送消息,这是正常的.但我们希望使用RELATED_CONVERSATION_GROUP对这些消息进行分组.我们希望用我们自己的数据库,从我们的数据库坚持UUID作为RELATED_CONVERSATION_GROUP = @uuid,但即使我们使用相同的UUID每次conversion_group_id每次我们收到的排队时间来不同.

你们知道我创建经纪人或接听电话的方式有什么问题,我提供了下面的经纪人创建代码和接收电话代码.谢谢

下面是代码"Service Broker创建代码"

CREATE PROCEDURE dbo.OnDataInserted

@EntityType NVARCHAR(100),
@MessageID BIGINT,
@uuid uniqueidentifier,
@message_body nvarchar(max)
AS

BEGIN

SET NOCOUNT ON;

 DECLARE @conversation UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @conversation
FROM SERVICE DataInsertSndService
TO SERVICE 'DataInsertRcvService'
ON CONTRACT DataInsertContract
WITH RELATED_CONVERSATION_GROUP = @uuid;

SEND ON CONVERSATION @conversation
MESSAGE TYPE DataInserted
(CAST(@message_body))
Run Code Online (Sandbox Code Playgroud)

下面是代码"接收代码"

WHILE 0 < @@TRANCOUNT ROLLBACK; SET NOCOUNT ON

BEGIN TRANSACTION;

DECLARE 
@cID as uniqueidentifier, 
@conversationHandle as uniqueidentifier,
@conversationGroupId as uniqueidentifier,
@tempConversationGroupId as uniqueidentifier,
@message_body VARBINARY(MAX)

RAISERROR ('Awaiting Message ...', 16, 1) WITH NOWAIT

;WAITFOR (RECEIVE TOP (1) 
@cID = Substring(CAST(message_body as nvarchar(max)),4,36), 
@conversationHandle = [conversation_handle],
@conversationGroupId = [conversation_group_id],
@message_body = message_body
FROM DataInsertRcvQueue)

RAISERROR ('Message Received', 16, 1) WITH NOWAIT
Select @tempConversationGroupId = conversationGroupID from ConversationGroupMapper where cID = @cID; 
declare @temp as nvarchar(max);
Set @temp = CAST(@tempConversationGroupId as nvarchar(max));
if @temp  <> ''
BEGIN
    MOVE CONVERSATION @conversationHandle TO @tempConversationGroupId;

RAISERROR ('Moved to Existing Conversation Group' , 16, 1) WITH NOWAIT
END
    else
BEGIN
insert into ConversationGroupMapper values (@cID,@conversationGroupId);

RAISERROR ('New Conversation Group' , 16, 1) WITH NOWAIT
END

WAITFOR DELAY '000:00:10'

COMMIT

RAISERROR ('Committed' , 16, 1) WITH NOWAIT
Run Code Online (Sandbox Code Playgroud)

我们的情况是我们需要在循环中接收来自此Service Broker队列的项目,阻止WAITFOR,并通过不可靠的网络将它们移交给另一个系统.从队列接收的项目将发往与该远程系统的许多连接之一.如果项目未成功传递到其他系统,则应回滚该单个项目的事务,并将该项目返回到队列.我们在成功交付时提交事务,解锁由后续循环迭代拾取的消息序列.

一系列相关项目的延迟不应影响无关序列的传递.单个项目一旦可用就会立即发送到队列中并立即转发.项目应该是单文件转发的,尽管在一个序列中的交付顺序并不严格.

从一次接收一条消息的循环中,从我们的打开连接列表中选择一个新的或现有的TcpClient,并且消息和开放连接通过异步IO回调链传递,直到传输完成.然后我们完成数据库事务,我们从Service Broker Queue接收了Item.

如何使用Service Broker和会话组来协助这种情况?

Rem*_*anu 8

会话组仅是本地概念,专门用于锁定:相关会话属于一个组,这样当您在一个会话上处理消息时,另一个线程无法处理相关消息.没有关于两个端点交换的会话组的信息,因此在您的示例中,所有发起方端点最终都属于一个会话组,但目标端点每个都是一个不同的会话组(每个组只有一个会话).系统行为的原因是因为会话组旨在解决诸如旅行预订服务之类的问题:当它收到"预订旅行"的消息时,它必须预订航班,酒店和汽车出租.它必须发送三条消息,每条消息一条("航班","酒店","汽车"),然后响应将异步返回.当它们回来时,处理必须确保它们不会被单独的线程同时处理,每个线程都会尝试更新"trip"记录状态.在消息传递中,这个问题被称为"消息关联问题".

但是,由于性能原因,通常会在SSB中部署会话组:它们允许更大的RECEIVE结果.目标端点可以通过使用一起移动到一个组中,MOVE CONVERSATION但实际上有一个更简单的技巧:反转会话的方向.让目的地开始对话(分组),并且目的地开始的对话上发送"更新".

一些说明:

  • 不要使用BEGIN/SEND/END的"即发即忘"模式.您将来无法诊断任何问题,请参阅Fire and Forget:对军方有利,但对Service Broker对话无效.
  • 永远不要在生产代码中使用WITH CLEANUP.它适用于灾难恢复等管理性最后操作.如果您滥用它,则拒绝SSB正确跟踪消息以进行正确的重试传递的机会(如果消息在目标上反弹,无论出于何种原因,它将永远丢失).
  • SSB不保证跨会话的订单,仅在一个会话中.为每个INSERT事件启动新会话并不保证在目标上保留插入操作的顺序.