Go 在通道上进行非阻塞多重接收

Bea*_*epp 1 select nonblocking go

似乎到处都在讨论从通道读取应该始终是阻塞操作。态度似乎是这就是正确的方式。这是有道理的,但我正在尝试弄清楚如何从渠道中聚合内容。

例如发送http请求。假设我有一个生成数据流的管道设置,因此我有一个生成点队列/流的通道。然后我可以让一个 goroutine 监听这个通道并发送一个 HTTP 请求以将其存储在服务中。这可行,但我正在为每个点创建一个 http 请求。

我发送的端点也允许我批量发送多个数据点。我想做的是

  1. 读取尽可能多的值,直到我阻塞通道。
  2. 组合它们/发送单个 http 请求。
  3. 然后封锁频道,直到我可以再次阅读。

这就是我在 C 中使用线程安全队列和 select 语句完成任务的方式。基本上在可能的情况下刷新整个/队列缓冲区。这是 Go 中有效的技术吗?

似乎 go select 语句确实给了我类似于 C 的 select 的东西,但我仍然不确定通道上是否有“非阻塞读取”。

编辑:我也愿意接受我想要的可能不是正确的方式,但不断地粉碎不间断的http请求对我来说似乎也是错误的,特别是如果它们可以聚合的话。如果有人有一个替代架构,那会很酷,但我想避免诸如神奇地缓冲 N 个项目,或者等待 X 秒直到发送之类的事情。

Sim*_*Fox 5

以下是如何进行批处理直到通道为空。该变量batch是数据点类型的一部分。该变量ch是数据点类型的通道。

var batch []someType
for {
    select {
    case v := <-ch:
       batch = append(batch, v)
    default:
       if len(batch) > 0 {
           sendBatch(batch)
           batch := batch[:0]
       }
       batch = append(batch, <-ch)  // receiving a value here prevents busy waiting.
    }
}
Run Code Online (Sandbox Code Playgroud)

您应该防止批次无限制地增长。这是一个简单的方法:

var batch []someType
for {
    select {
    case v := <-ch:
       batch = append(batch, v)
       if len(batch) >= batchLimit {
           sendBatch(batch)
           batch := batch[:0]
       }
    default:
       if len(batch) > 0 {
           sendBatch(batch)
           batch := batch[:0]
       }
       batch = append(batch, <-ch)
    }
}
Run Code Online (Sandbox Code Playgroud)