使用GCD如何处理NSStream run loop调度带来的并发问题?

Mat*_*toe 5 concurrency cocoa-touch nsstream grand-central-dispatch nsrunloop

我有以下情况,我创建了一个 GCD 调度队列,并在其中将 an 调度NSStream到当前NSRunLoop,正如其规范中要求它发出委托事件,然后我使用[[NSRunLoop currentRunLoop run].

这会产生三种可能的情况:

  1. 创建一个串行队列,其中通过流发送初始写入消息,并且仅当存在来自NSStream对象的委托回调时才发送其他写入消息,因为尝试在不遵守此模式的情况下写入新消息(这将是可取的)将失败,因为队列被运行循环锁定。

  2. 创建一个并发队列,在该队列中消息可以自由写入流,因为发送到队列的块将与运行运行循环的块同时执行。然而,虽然希望使写入消息和运行循环并发运行,但肯定不希望必须在并发运行的队列中阻塞同时尝试写入流。

  3. 创建两个队列——一个负责保持 run loop 处于活动状态并接收 read-from-stream 回调,另一个负责向流发送异步写入消息。这看起来很理想,但是NSStream文档似乎明确指出不应尝试在调度的线程之外读取/写入流。

鉴于这些场景都不理想,如何解决这些问题?

Mux*_*Mux 7

迟到了,但您可以直接使用为您的流设置所需的调度队列,而不是使用 runloops

void CFReadStreamSetDispatchQueue(CFReadStreamRef stream, dispatch_queue_t q);
void CFWriteStreamSetDispatchQueue(CFWriteStreamRef stream, dispatch_queue_t q);
Run Code Online (Sandbox Code Playgroud)

其中 CFReadStreamRef 可以采用桥接 NSInputStream 和 CFWriteStreamRef 桥接 NSOutputStream。通过这种方式,您根本不必安排或取消安排运行循环,您的流将在后台运行。

Apple 示例代码的片段:

CFReadStreamSetDispatchQueue((__bridge CFReadStreamRef) self.inputStream,  self.queue);
CFWriteStreamSetDispatchQueue((__bridge CFWriteStreamRef) self.outputStream, self.queue);
Run Code Online (Sandbox Code Playgroud)

在 Swift 中,你可以直接调用函数:

CFReadStreamSetDispatchQueue(inputStream, streamQueue)
CFWriteStreamSetDispatchQueue(outputStream, streamQueue)
Run Code Online (Sandbox Code Playgroud)


ipm*_*mcc 3

正如您在文档中指出的那样,当您有一个基于运行循环的 API(例如 )时NSStream,一般期望是与该对象的所有交互都将发生在拥有调度该对象的运行循环的线程上。我不确定使用NSStream.

除了主队列之外,GCD 没有线程关联的概念,因此除非您调度的运行循环NSStream恰好是主线程运行循环,否则没有好的方法可以用来dispatch_async调度块在该线程上执行。

冒着陈述显而易见的风险,您可能应该只使用标准方法来调度其他线程上的方法。-performSelector:onThread:withObject:waitUntilDone:modes:是最明显的。如果您的困惑是想要使用块,那么了解堆分配的块可以像 Objective-C 对象一样对待并-invokeNSInvocations 一样实现选择器会有所帮助。与您的问题相关的一个简单示例可能如下所示:

@interface AppDelegate ()
{
    NSThread* bgthread;
}
@end

@implementation AppDelegate

- (void)applicationDidFinishLaunching:(NSNotification *)aNotification
{
    // Basic loop to get the background thread to run until you call -cancel on it
    dispatch_block_t threadMain = [^{
        NSThread* thread = [NSThread currentThread];
        NSParameterAssert(![thread isMainThread]);

        NSRunLoop* currentRunLoop = [NSRunLoop currentRunLoop];
        NSPort* port = [NSPort port];

        // If we dont register a mach port with the run loop, it will just exit immediately
        [currentRunLoop addPort: port forMode: NSRunLoopCommonModes];

        // Loop until the thread is cancelled.
        while (!thread.cancelled)
        {
            [currentRunLoop runMode: NSDefaultRunLoopMode beforeDate: [NSDate distantFuture]];
        }

        [currentRunLoop removePort: port forMode: NSRunLoopCommonModes];

        [port invalidate];
        port = nil;
    } copy];

    // Start the thread
    bgthread = [[NSThread alloc] initWithTarget: threadMain selector: @selector(invoke) object: nil];
    [bgthread start];

    // Fetch the runloop, so you can schedule an NSStream on it...
    __block NSRunLoop* runloopForStream = nil;
    dispatch_block_t getrunloop = [^{
        runloopForStream = [NSRunLoop currentRunLoop];
    } copy];

    // Dispatch synchronously, so that runloopForStream is populated before we continue...
    [getrunloop performSelector: @selector(invoke) onThread: bgthread withObject: nil waitUntilDone: YES];

    // Schedule your stream, etc.
    NSOutputStream* mystream = ...; // Your code here...
    [mystream scheduleInRunLoop: runloopForStream forMode: NSDefaultRunLoopMode];

    // Then later, when you want to write some data...
    NSData* dataToWrite = [NSMutableData dataWithLength: 100];
    dispatch_block_t doWrite = [^{
        [mystream write: dataToWrite.bytes maxLength: dataToWrite.length];
    } copy];

    // Dispatch asynchronously to thread
    [doWrite performSelector: @selector(invoke) onThread: bgthread withObject: nil waitUntilDone: NO];
}
@end
Run Code Online (Sandbox Code Playgroud)

请注意,-copy必须将块复制到堆,否则当声明方法超出范围时,它们将被释放。