在特定线程上运行工作

Đra*_*nus 6 c# multithreading task

我想有一个特定的线程,在这个单独的线程中为任务和进程任务排队.应用程序将根据用户使用情况制作任务,并将其排入任务队列.然后单独的线程处理任务.即使队列为空,保持线程处于活动状态并使用它来处理排队任务也是至关重要的.

我已经尝试了几个TaskSchedulerwith的实现BlockingCollection并将并发限制为只有一个线程,但是当队列变空并且任务由其他线程处理时,似乎线程被释放.

你能否至少向我推荐一些如何实现这一目标的消息来源?

tl; dr尝试限制一个特定线程来处理动态添加到队列的任务.

EDIT1:

这是使用WCF和.NET framework 4.6的实验性Web应用程序.在WCF库中,我试图用一个线程处理任务来实现此行为.这个线程必须使用外部dll库初始化prolog,然后使用prolog.如果在进程中使用了其他线程,则库会抛出AccessViolationException.我做了一些研究,这很可能是因为该库中的线程管理不当.我实现了我到处都有锁的工作方式.我现在正在尝试重新实现并使其异步,因此我不会使用锁定来阻止主线程.

我不在我的电脑上,但是当我今天晚些时候回家时,我提供了一些代码.

Lua*_*aan 8

你的方法似乎很好,所以你可能只是犯了一些小愚蠢的错误.

实际上,制作一个简单的自定义很容易TaskScheduler.对于你的情况:

void Main()
{
  var cts = new CancellationTokenSource();
  var myTs = new SingleThreadTaskScheduler(cts.Token);

  myTs.Schedule(() => 
   { Print("Init start"); Thread.Sleep(1000); Print("Init done"); });
  myTs.Schedule(() => Print("Work 1"));   
  myTs.Schedule(() => Print("Work 2"));
  myTs.Schedule(() => Print("Work 3"));
  var lastOne = myTs.Schedule(() => Print("Work 4"));

  Print("Starting TS");
  myTs.Start();

  // Wait for all of them to complete...
  lastOne.GetAwaiter().GetResult();

  Thread.Sleep(1000);

  // And try to schedule another
  myTs.Schedule(() => Print("After emptied")).GetAwaiter().GetResult();

  // And shutdown; it's also pretty useful to have the 
  // TaskScheduler return a "complete task" to await
  myTs.Complete();

  Print("On main thread again");
}

void Print(string str)
{
  Console.WriteLine("{0}: {1}", Thread.CurrentThread.ManagedThreadId, str);
  Thread.Sleep(100);
}

public sealed class SingleThreadTaskScheduler : TaskScheduler
{
  [ThreadStatic]
  private static bool isExecuting;
  private readonly CancellationToken cancellationToken;

  private readonly BlockingCollection<Task> taskQueue;

  public SingleThreadTaskScheduler(CancellationToken cancellationToken)
  {
      this.cancellationToken = cancellationToken;
      this.taskQueue = new BlockingCollection<Task>();
  }

  public void Start()
  {
      new Thread(RunOnCurrentThread) { Name = "STTS Thread" }.Start();
  }

  // Just a helper for the sample code
  public Task Schedule(Action action)
  {
      return 
          Task.Factory.StartNew
              (
                  action, 
                  CancellationToken.None, 
                  TaskCreationOptions.None, 
                  this
              );
  }

  // You can have this public if you want - just make sure to hide it
  private void RunOnCurrentThread()
  {
      isExecuting = true;

      try
      {
          foreach (var task in taskQueue.GetConsumingEnumerable(cancellationToken))
          {
              TryExecuteTask(task);
          }
      }
      catch (OperationCanceledException)
      { }
      finally
      {
          isExecuting = false;
      }
  }

  // Signaling this allows the task scheduler to finish after all tasks complete
  public void Complete() { taskQueue.CompleteAdding(); }   
  protected override IEnumerable<Task> GetScheduledTasks() { return null; }

  protected override void QueueTask(Task task)
  {
      try
      {
          taskQueue.Add(task, cancellationToken);
      }
      catch (OperationCanceledException)
      { }
  }

  protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  {
      // We'd need to remove the task from queue if it was already queued. 
      // That would be too hard.
      if (taskWasPreviouslyQueued) return false;

      return isExecuting && TryExecuteTask(task);
  }
}
Run Code Online (Sandbox Code Playgroud)

修改它很容易让你完全控制任务调度程序实际执行任务的位置 - 事实上,我已经使用了我之前使用过的只是RunOnCurrentThread方法公开的任务调度程序.

对于您的情况,您总是希望坚持只有一个线程,方法SingleThreadTaskScheduler可能更好.虽然这也有其优点:

// On a new thread
try
{
  InitializeProlog();

  try
  {
    myTs.RunOnCurrentThread();
  }
  finally
  {
    ReleaseProlog();
  }
}
catch (Exception ex)
{
  // The global handler
}
Run Code Online (Sandbox Code Playgroud)