do 块中作业数量的上限?

qed*_*qed 4 multithreading clojure core.async

这是代码:

(ns typedclj.async
  (:require [clojure.core.async
             :as a
             :refer [>! <! >!! <!!
                     go chan buffer
                     close! thread
                     alts! alts!! timeout]]
            [clj-http.client :as -cc]))


(time (dorun
        (let [c (chan)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))
Run Code Online (Sandbox Code Playgroud)

我得到了一个错误:

Exception in thread "async-dispatch-12" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
    at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)
    at clojure.core.async.impl.ioc_macros$put_BANG_.invoke(ioc_macros.clj:959)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817$fn__11819.invoke(async.clj:19)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817.invoke(async.clj:19)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
    at typedclj.async$eval11807$fn__11816.invoke(async.clj:19)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)...
Run Code Online (Sandbox Code Playgroud)

根据http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/

...这将打破 1 个作业 = 1 个线程结,因此这个线程停放将允许我们扩展作业数量,使其超出平台上的任何线程限制(通常在 JVM 上为 1000 左右)。

core.async 在使用“线程”时提供(阻塞)通道和一个新的(无界)线程池。这(实际上)只是使用 java 线程(或 clojure 期货)和 java.util.concurrent 中的 BlockingQueues 的一些糖。主要功能是 go 块,其中线程可以在处理 core.async 通道的(可能)阻塞调用上停放和恢复......

1e4职位已经太多了吗?那么上限是多少呢?

Art*_*ldt 5

我通常不会像这样咆哮,所以我希望你能原谅我这个过错:

在一个更完美的世界中,每个程序员都会在睡觉前和醒来时的第一件事中对自己重复五次“没有无限队列这样的东西”。这种思维模式需要弄清楚系统中将如何处理背压,以便当过程中某处出现减速时,之前的部件有办法找出它并降低自己的响应速度。在 core.async 中,默认背压是即时的,因为默认缓冲区大小为零。在有人准备消费它之前,没有 go block 成功地将某些东西放入 chan 中。

chans 看起来基本上是这样的:

"queue of pending puts" --> buffer --> "queue of pending takes"
Run Code Online (Sandbox Code Playgroud)

putter 和 taker 队列旨在为通过此管道通信的两个进程留出时间来安排自己,以便取得进展。没有这些,线程就没有空间进行调度,并且会发生死锁。他们打算用来作为缓冲。这就是中间缓冲区的用途,这就是使其成为唯一具有明确大小的缓冲区的设计。通过在 chan 中设置缓冲区的大小,为您的系统显式设置缓冲区大小

user> (time (dorun
        (let [c (chan 1e6)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))
"Elapsed time: 83.526679 msecs"
nil
Run Code Online (Sandbox Code Playgroud)

在这种情况下,我已经“计算出”如果有多达一百万个等待工作,我的系统作为一个整体将处于良好状态。当然,您在现实世界中的体验会有所不同,而且对于您的情况非常独特。

谢谢你的耐心,