使用GCD实现线程池

Eug*_*eev 2 concurrency multithreading grand-central-dispatch threadpool ios

我有一个包含可以并行计算任务的大循环。为此,我决定使用 GCD 编写一个简单的并发线程池,因为我正在 iOS 上工作。

我的线程池看起来相当简单。我将仅附加.m文件,这足以理解我的想法:

#import "iOSThreadPool.h"

@interface iOSThreadPool()
{
    int                                     _timeout;
    int                                     _currentThreadId;
    NSMutableArray<dispatch_queue_t>        *_pool;
    NSMutableArray<dispatch_semaphore_t>    *_semaphores;
    dispatch_group_t                        _group;
}

@end

@implementation iOSThreadPool

- (instancetype)initWithSize:(int)threadsCount tasksCount:(int)tasksCount
{
    self = [super init];
    if (self) {
        _timeout = 2.0;
        _currentThreadId = 0;
        _pool = [NSMutableArray new];
        _semaphores = [NSMutableArray new];
        for (int i = 0; i < threadsCount; i++) {
            dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_BACKGROUND, 0);
            dispatch_queue_t queue = dispatch_queue_create([NSString stringWithFormat:@"com.workerQueue_%d", i].UTF8String, attr);
            [_pool addObject:queue];

            dispatch_semaphore_t sema = dispatch_semaphore_create(tasksCount);
            [_semaphores addObject:sema];
        }

        _group = dispatch_group_create();
    }

    return self;
}

- (void)async:(iOSThreadPoolBlock)block
{
    dispatch_group_enter(self->_group);

    __block dispatch_semaphore_t sema = _semaphores[_currentThreadId];
    dispatch_async(_pool[_currentThreadId], ^{

        dispatch_semaphore_wait(sema, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(self->_timeout * NSEC_PER_SEC)));
        block();
        dispatch_semaphore_signal(sema);

        dispatch_group_leave(self->_group);
    });

    _currentThreadId = (_currentThreadId + 1) % _pool.count;
}

- (void)wait {
    dispatch_group_wait(_group, dispatch_time(DISPATCH_TIME_NOW, (int64_t)(self->_timeout * NSEC_PER_SEC)));
}

@end
Run Code Online (Sandbox Code Playgroud)

因此,基本上当我创建线程池时,我会设置线程计数和信号量值。由于队列是并发的,我想限制可以并发执行的任务数量,这样线程就不会被淹没。

问题是 - 无论我创建多少线程,它都不会影响性能。我猜发生这种情况是因为每个调度队列任务最终都在全局队列中,无论我有多少个队列,它们BACKGROUND大多数时候都会将任务发送到同一个队列。

我读了很多相关内容GCD,并在实践中成功地使用了它。但是,当我只想超越您可以在无数教程中找到的简单用法时,例如执行几个并行进程以节省尽可能多的执行时间 - 我失败了。我搜索了更详细的解释或更详细的有效技术GCD,但一无所获。看起来 90% 的时间都是以非常简单的方式使用的。同时我听说这GCD是非常非常强大的多线程框架,很明显,我只是不知道如何正确使用它。

所以我的问题是 - 这真的有可能在 iOS 上启动几个并行进程吗?我应该在线程池中更改什么以使其高效?

注意:我下载了ThreadPool基于std::thread. 如果我更改该池中的线程数,我会清楚地看到性能提升。如果有GCD专家能指导我如何GCD最大程度地使用它,我将不胜感激。

Rob*_*Rob 6

GCD 已经进行了线程池(调度队列正在利用 \xe2\x80\x9cworker 线程\xe2\x80\x9d 池),因此在其之上添加另一层池是多余/低效的。 。

\n\n

你说:

\n\n
\n

问题是 - 无论我创建多少线程,它都不会影响性能。

\n
\n\n

这可能是多种情况中的任何一种。一个常见的问题是工作单元太小。正如同时执行循环所说:

\n\n
\n

您应该确保您的任务代码在每次迭代中完成合理的工作量。与分派到队列的任何块或函数一样,调度该代码执行会产生开销。如果循环的每次迭代仅执行少量工作,则调度代码的开销可能会超过将其分派到队列所获得的性能优势。

\n
\n\n

但还存在各种其他问题,包括低效的同步代码、缓存晃动等。如果没有该问题的可重现示例,就不可能说清楚。虽然 QoS 也会产生影响,但与这些算法问题相比,它通常可以忽略不计。

\n\n

你说:

\n\n
\n

由于队列是并发的,我想限制可以并发执行的任务数量,这样线程就不会被淹没。

\n
\n\n

虽然您可以使用非零调度信号量或NSOperationQueue某些 来实现此目的maxConcurrentOperationCount,但(对于 Swift 用户dispatch_apply而言)是一个 \xe2\x80\x9cgo to\xe2\x80\x9d 解决方案,用于平衡工作负载的计算密集型并行例程concurrentPerform跨CPU核心。它会自动查看您拥有多少个核心\xe2\x80\x99,并将循环分布在它们之间,而不会有线程爆炸的风险。而且,如改进循环代码中所述,您可以尝试使用步幅来很好地平衡每个线程上完成的工作量与线程协调的固有开销。(跨步还可以最大限度地减少缓存争用。)

\n\n

我可能建议研究dispatch_apply并尝试一下。如果您当时仍然不清楚,只需发布​​一个新问题来显示非并行例程和并行再现,我们可以提供进一步的帮助。

\n\n
\n\n

正如我上面所说的,我不认为你想要这个例程。对于计算密集型例程,我更喜欢dispatch_apply. 对于我想要控制并发程度的简单队列(特别是如果其中一些任务本身是异步的),我\xe2\x80\x99dNSOperationQueue使用maxConcurrentOperationCount. 但我想我\xe2\x80\x99d 分享了对你的代码片段的一些观察:

\n\n
    \n
  • 你\xe2\x80\x99实现的是队列池,而不是线程池;

  • \n
  • 你\xe2\x80\x99调用的threadsCount不是线程计数,而是队列计数。因此,如果您创建一个计数为 10 和tasksCount20 的池,则意味着您\xe2\x80\x99 可能使用 200 个线程。

  • \n
  • 同样,您\xe2\x80\x99 调用的_currentThreadId也不是当前线程。它\xe2\x80\x99是当前队列。

  • \n
  • 与的交互_currentThreadId不是线程安全的。

  • \n
\n\n

最重要的是,GCD 有自己的线程池,因此您不应该\xe2\x80\x99 重现该逻辑。您需要做的就是实现 \xe2\x80\x9c 不超过threadCount\xe2\x80\x9d 逻辑(可以使用非零调度信号量来实现)。因此,我\xe2\x80\x99d 建议将其简化为:

\n\n
@interface ThreadPool()\n@property (nonatomic, strong) dispatch_queue_t pool;\n@property (nonatomic, strong) dispatch_queue_t scheduler;\n@property (nonatomic, strong) dispatch_semaphore_t semaphore;\n@end\n\n@implementation ThreadPool\n\n- (instancetype)initWithThreadCount:(int)threadCount {\n    self = [super init];\n    if (self) {\n        NSString *identifier = [[NSUUID UUID] UUIDString];\n        NSString *bundleIdentifier = [[NSBundle mainBundle] bundleIdentifier];\n\n        NSString *schedulingLabel = [NSString stringWithFormat:@"%@.scheduler.%@", bundleIdentifier, identifier];\n        _scheduler = dispatch_queue_create(schedulingLabel.UTF8String, DISPATCH_QUEUE_SERIAL);\n\n        NSString *poolLabel = [NSString stringWithFormat:@"%@.pool.%@", bundleIdentifier, identifier];\n\n        dispatch_queue_attr_t attr = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_CONCURRENT, QOS_CLASS_BACKGROUND, 0);\n        _pool = dispatch_queue_create(poolLabel.UTF8String, attr);\n\n        _semaphore = dispatch_semaphore_create(threadCount);\n    }\n\n    return self;\n}\n\n- (void)async:(ThreadPoolBlock)block {\n    dispatch_async(self.scheduler, ^{\n        dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);\n        dispatch_async(self.pool, ^{\n            block();\n            dispatch_semaphore_signal(self.semaphore);\n        });\n    });\n}\n\n@end\n
Run Code Online (Sandbox Code Playgroud)\n\n

不用说,这个实现,就像你的一样,假设传递给方法的块async本身是同步的(例如,它\xe2\x80\x99s 没有启动另一个异步进程,如网络请求或其他)。我怀疑你知道这一点,但我只是为了完整起见才提到它。

\n