当InstanceContextMode是具有Net.Tcp绑定的WCF服务的PerCall时,Multipleurrency的ConcurrencyMode是否具有相关性?

sof*_*eda 10 wcf multithreading

我一直认为将InstanceContextMode设置为PerCall会使并发模式无关紧要,即使使用会话感知绑定(如net.tcp)也是如此.这就是MSDN所说的 http://msdn.microsoft.com/en-us/library/ms731193.aspx "在PerCallinstancing中,并发性是不相关的,因为每个消息都由新的InstanceContext处理,因此,永远不会超过一个线程在InstanceContext中处于活动状态."


但今天我正在阅读Juval Lowy的书"编程WCF服务",他在第8章写道

如果每个呼叫服务具有传输级会话,则是否允许并发处理呼叫是服务并发模式的产物.如果使用ConcurrencyMode.Single配置服务,则不会对挂起的调用进行并发处理,并且一次调度一个调用.[...]我认为这是一个有缺陷的设计.如果使用ConcurrencyMode.Multiple配置服务,则允许并发处理.呼叫在到达时分派,每个呼叫到新实例,并同时执行.这里一个有趣的观察是,为了输出,使用ConcurrencyMode.Multiple配置每个呼叫服务是个好主意 - 实例本身仍然是线程安全的(因此您不会产生同步责任) ,但您将允许来自同一客户端的并发呼叫.


这与我的理解和MSDN所说的相矛盾.哪个是对的 ?在我的情况下,我有一个WCF Net.Tcp服务使用我的许多客户端应用程序创建一个新的代理对象,进行调用,然后立即关闭代理.该服务具有PerCall InstanceContextMode.如果我将InstanceContextMode更改为Multiple而没有比percall更糟糕的线程安全行为,我是否会获得更高的吞吐量?

Ern*_*ieL 8

阅读洛伊声明的关键词是" 为了吞吐量 ".Lowy指出,当使用ConcurrencyMode.Single时,WCF将盲目地实现锁定以强制序列化到服务实例.锁是昂贵的,这是不必要的,因为PerCall已经保证第二个线程永远不会尝试调用相同的服务实例.

在行为方面: ConcurrencyMode对PerCall服务实例无关紧要.

在性能方面: ConcurrencyMode.Multiple的PerCall服务应该稍​​快一些,因为它不会创建和获取ConcurrencyMode.Single正在使用的(不需要的)线程锁.

我写了一个快速的基准测试程序,看看我是否可以衡量单一与多重对PerCall服务的性能影响:基准测试没有显示出任何有意义的差异.

如果您想尝试自己运行,我会粘贴在下面的代码中.

我试过的测试用例:

  • 600个线程调用服务500次
  • 200个线程调用服务1000次
  • 8个线程调用服务10000次
  • 1个线程调用服务10000次

我在运行Service 2008 R2的4 CPU VM上运行它.除1线程外,所有情况都受CPU限制.

结果:所有运行均在彼此的约5%之内. 有时ConcurrencyMode.Multiple更快.有时ConcurrencyMode.Single更快.也许正确的统计分析可以挑选一个胜利者.在我看来,他们足够接近无所谓.

这是典型的输出:

在net.pipe上启动单个服务:// localhost/base ... Type = SingleService ThreadCount = 600 ThreadCallCount = 500 运行时:45156759 ticks 12615 msec

在net.pipe上启动多个服务:// localhost/base ... Type = MultipleService ThreadCount = 600 ThreadCallCount = 500 runtime:48731273 ticks 13613 msec

在net.pipe上启动单一服务:// localhost/base ... Type = SingleService ThreadCount = 600 ThreadCallCount = 500 runtime:48701509 ticks 13605 msec

在net.pipe:// localhost/base上启动多个服务... Type = MultipleService ThreadCount = 600 ThreadCallCount = 500 运行时:48590336 ticks 13574 msec

基准代码:

常见警告:这是基准代码,采用不适合生产用途的捷径.

using System;
using System.Collections.Generic;
using System.Linq;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace WCFTest
{
    [ServiceContract]
    public interface ISimple
    {
        [OperationContract()]
        void Put();
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, ConcurrencyMode = ConcurrencyMode.Single)]
    public class SingleService : ISimple
    {
        public void Put()
        {
            //Console.WriteLine("put got " + i);
            return;
        }
    }

    [ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall, ConcurrencyMode = ConcurrencyMode.Multiple)]
    public class MultipleService : ISimple
    {
        public void Put()
        {
            //Console.WriteLine("put got " + i);
            return;
        }
    }

    public class ThreadParms
    {
        public int ManagedThreadId { get; set; }
        public ServiceEndpoint ServiceEndpoint { get; set; }
    }

    public class BenchmarkService
    {
        public readonly int ThreadCount;
        public readonly int ThreadCallCount;
        public readonly Type ServiceType; 

        int _completed = 0;
        System.Diagnostics.Stopwatch _stopWatch;
        EventWaitHandle _waitHandle;
        bool _done;

        public BenchmarkService(Type serviceType, int threadCount, int threadCallCount)
        {
            this.ServiceType = serviceType;
            this.ThreadCount = threadCount;
            this.ThreadCallCount = threadCallCount;

            _done = false;
        }

        public void Run(string baseAddress)
        {
            if (_done)
                throw new InvalidOperationException("Can't run twice");

            ServiceHost host = new ServiceHost(ServiceType, new Uri(baseAddress));
            host.Open();

            Console.WriteLine("Starting " + ServiceType.Name + " on " + baseAddress + "...");

            _waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
            _completed = 0;
            _stopWatch = System.Diagnostics.Stopwatch.StartNew();

            ServiceEndpoint endpoint = host.Description.Endpoints.Find(typeof(ISimple));

            for (int i = 1; i <= ThreadCount; i++)
            {
                // ServiceEndpoint is NOT thread safe. Make a copy for each thread.
                ServiceEndpoint temp = new ServiceEndpoint(endpoint.Contract, endpoint.Binding, endpoint.Address);
                ThreadPool.QueueUserWorkItem(new WaitCallback(CallServiceManyTimes),
                    new ThreadParms() { ManagedThreadId = i, ServiceEndpoint = temp });
            }

            _waitHandle.WaitOne();
            host.Shutdown();

            _done = true;

            //Console.WriteLine("All DONE.");
            Console.WriteLine("    Type=" + ServiceType.Name + "  ThreadCount=" + ThreadCount + "  ThreadCallCount=" + ThreadCallCount);
            Console.WriteLine("    runtime: " + _stopWatch.ElapsedTicks + " ticks   " + _stopWatch.ElapsedMilliseconds + " msec");
        }

        public void CallServiceManyTimes(object threadParams)
        {
            ThreadParms p = (ThreadParms)threadParams;

            ChannelFactory<ISimple> factory = new ChannelFactory<ISimple>(p.ServiceEndpoint);
            ISimple proxy = factory.CreateChannel();

            for (int i = 1; i < ThreadCallCount; i++)
            {
                proxy.Put();
            }

            ((ICommunicationObject)proxy).Shutdown();
            factory.Shutdown();

            int currentCompleted = Interlocked.Increment(ref _completed);

            if (currentCompleted == ThreadCount)
            {
                _stopWatch.Stop();
                _waitHandle.Set();
            }
        }
    }


    class Program
    {
        static void Main(string[] args)
        {
            BenchmarkService benchmark;
            int threadCount = 600;
            int threadCalls = 500;
            string baseAddress = "net.pipe://localhost/base";

            for (int i = 0; i <= 4; i++)
            {
                benchmark = new BenchmarkService(typeof(SingleService), threadCount, threadCalls);
                benchmark.Run(baseAddress);

                benchmark = new BenchmarkService(typeof(MultipleService), threadCount, threadCalls);
                benchmark.Run(baseAddress);
            }

            baseAddress = "http://localhost/base";

            for (int i = 0; i <= 4; i++)
            {
                benchmark = new BenchmarkService(typeof(SingleService), threadCount, threadCalls);
                benchmark.Run(baseAddress);

                benchmark = new BenchmarkService(typeof(MultipleService), threadCount, threadCalls);
                benchmark.Run(baseAddress);
            }

            Console.WriteLine("Press ENTER to close.");
            Console.ReadLine();

        }
    }

    public static class Extensions
    {
        static public void Shutdown(this ICommunicationObject obj)
        {
            try
            {
                if (obj != null)
                    obj.Close();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Shutdown exception: {0}", ex.Message);
                obj.Abort();
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)