我有一个流媒体服务器,合同看起来像这样:
[ServiceContract]
public interface IStreamingService
{
[OperationContract(Action = "StreamingMessageRequest", ReplyAction = "StreamingMessageReply")]
Message GetStreamingData(Message query);
}
Run Code Online (Sandbox Code Playgroud)
这是一个基本的实现,删除了一些东西(如错误处理)以简化操作:
[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
[ErrorBehavior(typeof(StreamingServiceErrorHandler))]
public class StreamingService : IStreamingService
{
public StreamingService()
{
}
public Message GetStreamingData(Message query)
{
var dataQuery = query.GetBody<DataQuery>();
// Hook up events to let us know if the client disconnects so that we can stop the query...
EventHandler closeAction = (sender, ea) =>
{
dataQuery.Stop();
};
OperationContext.Current.Channel.Faulted += closeAction;
OperationContext.Current.Channel.Closed += closeAction;
Message streamingMessage = Message.CreateMessage(
MessageVersion.Soap12WSAddressing10,
"QueryMessageReply",
new StreamingBodyWriter(QueryMethod(dataQuery));
return streamingMessage;
}
public IEnumerable<object> QueryMethod (DataQuery query)
{
// Returns a potentially infinite stream of objects in response to the query
}
}
Run Code Online (Sandbox Code Playgroud)
此实现使用自定义BodyWriter来传递QueryMethod的结果:
public class StreamingBodyWriter : BodyWriter
{
public StreamingBodyWriter(IEnumerable items)
: base(false) // False should be passed here to avoid buffering the message
{
Items = items;
}
internal IEnumerable Items { get; private set; }
private void SerializeObject(XmlDictionaryWriter writer, object item)
{
// Serialize the object to the stream
}
protected override void OnWriteBodyContents(XmlDictionaryWriter writer)
{
foreach (object item in Items)
{
SerializeObject(writer, item);
}
}
}
Run Code Online (Sandbox Code Playgroud)
客户端连接并开始读取数据流.像这样的东西:
public IEnumerable<T> GetStreamingData<T>(Message queryMessage)
{
Message reply = _Server.GetStreamingData(queryMessage);
// Get a chunckable reader for the streaming reply
XmlReader reader = reply.GetReaderAtBodyContents();
// Read the stream of objects, deserializing each one as appropriate
while (!reader.EOF)
{
object item = DeserializeObject(reader);
if (item == null)
continue;
// Yield each item as it's deserialized, resulting in a [potentially] never-ending stream of objects
yield return (T)item;
}
}
Run Code Online (Sandbox Code Playgroud)
这非常有效,我将对象流返回给客户端.非常好.问题是当客户端断开中间流(优雅或不正常)时.在任何一种情况下,除了服务的错误处理程序选择的错误之外,服务器不会收到此断开连接的通知.
正如您所看到的,我已尝试在服务方法中挂钩Faulted和Closed通道事件,但在客户端断开连接时它们不会触发.
我怀疑这些事件不会触发,因为通过流合同,服务已经从操作方法(GetStreamingData)返回.这个方法返回了一个带有自定义正文编写器的Message,并且这个正文写入(在服务通道堆栈中较低)从计算线程中获取结果(通过IEnumerable)并将它们流入回复消息.由于操作方法已经返回,我猜这些通道事件不会起火.
绑定是net.tcp绑定(非双工)的自定义版本,只有两个绑定元素:BinaryMessageEncodingBindingElement和TcpTransportBindingElement.没有任何安全保障.基本上只是带有二进制消息的原始net.tcp.
问题是当客户端断开连接时,我希望服务器停止生成结果的后台计算线程并将它们提供给BodyWriter正在读取的IEnumerable.身体作家已停止(显然),但线程继续存在.
那么在哪里可以挂钩来发现客户端是否已断开连接?
更新:
有两种主要的断开情况:明确断开连接,由于客户代理的处置或客户端的流程终止; 被动断开,通常是由于网络故障(例如:ISP断开连接,或网络电缆被拉扯).
在第一种情况下,当服务器尝试沿流发送新数据时,会有一个明确的连接异常.没问题.
网络管道损坏的第二种情况更麻烦.通常这种情况是基于发送超时检测到的,但由于这是一个流接口,发送超时会逐渐增加(因为可以想象流数据传输可以持续那么久).因此,当客户端由于网络管道断开而断开连接时,该服务会继续沿着确实存在的连接发送数据.
在这一点上,我不知道解决第二个选项的好方法.建议?
对于处理这些类型的场景,Faulted 事件确实是最好的选择,但它仅在启用可靠会话时才会触发。它在标准 netTcpBinding 中默认启用,但在本例中不会启用,因为您正在使用绑定的自定义精简版本。尝试将其添加到您的自定义绑定中:)
归档时间: |
|
查看次数: |
1776 次 |
最近记录: |