标签: streaming

使用管道函数相对于 res.write 有什么优势

该框架是Express。

当我从端点内发送请求并开始接收数据时,我可以分块读取数据并立即写入它们:

responseHandler.on('data', (chunk) => {
  res.write(chunk);
});
Run Code Online (Sandbox Code Playgroud)

或者我可以创建一个可写流并将响应传输到该流。

responseHandler.pipe(res)
Run Code Online (Sandbox Code Playgroud)

显然,管道函数以更多维度处理了前一个过程。这些是什么?

streaming http node.js express

5
推荐指数
1
解决办法
6116
查看次数

慢慢改变 BigQuery 的查找缓存 - Dataflow Python Streaming SDK

我正在尝试遵循缓慢变化的查找缓存的设计模式(https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1)用于在 DataFlow 上使用适用于 Apache Beam 的 Python SDK 的流式传输管道。

\n\n

我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为 ParDo 操作的侧输入传递,但无论我们如何设置触发器/窗口,它都不会刷新。

\n\n
class FilterAlertDoFn(beam.DoFn):\n  def process(self, element, alertlist):\n\n    print len(alertlist)\n    print alertlist\n\n    \xe2\x80\xa6  # function logic\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n
alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))\n                        | \xe2\x80\x98alert_side_input\xe2\x80\x99 >> beam.WindowInto(\n                            beam.window.GlobalWindows(),\n                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(\n                                late=trigger.AfterCount(1)\n                            )),\n                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING\n                          )\n                       | beam.Map(lambda elem: elem[\xe2\x80\x98SOMEKEY\xe2\x80\x99])\n)\n\n...\n\n\nmain_input | \xe2\x80\x98alerts\xe2\x80\x99 >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

基于此处的 I/O 页面 ( https://beam.apache.org/documentation/io/built-in/ ),它表示 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源那么在这个方法中能不能刷新\xe2\x80\x99呢?

\n\n

尝试在源上设置非全局窗口会导致侧输入中出现空 PCollection。

\n\n
\n\n

更新 …

python streaming dataflow apache-beam

5
推荐指数
1
解决办法
1222
查看次数

如何确保在 gRPC 双向流式传输中收到消息?

我如何知道另一端已收到我通过 gRPC 流发送的消息?

是否有一种构建方法可以在 gRPC 双向流式传输中执行此操作,或者我是否需要仅使用流式传输,然后发回响应?

原型文件:


service SimpleService {
rpc SimpleRPC (stream SimpleData) returns (stream SimpleData) {}
}

message SimpleData {
string msg = 1;
}
Run Code Online (Sandbox Code Playgroud)

去代码:


client := pb.NewSimpleServiceClient(conn)
stream, err := client.SimpleRPC(context.Background())
waitc := make(chan struct{})

msg := &pb.SimpleData{"sup"}
go func() {
for {
   stream.Send(msg)
    }
}()
<-waitc
stream.CloseSend()

Run Code Online (Sandbox Code Playgroud)

streaming bidirectional go grpc

5
推荐指数
1
解决办法
3215
查看次数

使用 AVAudioengine 播放流媒体音频 (iOS)

我想使用 iOS 在 iOS 上流式传输音频AVAudioEngine。目前我不知道该怎么做。

我从网络获取 RTP 数据并想要播放此音频数据AVAudioEngine

我正在使用iOSNetwork.Framework接收网络数据。然后,我对语音数据进行解码,现在想使用 来播放它AVAudioEngine

这是我的接收代码:

connection.receiveMessage { (data, context, isComplete, error) in
    if isComplete {

    // decode the raw network data with Audio codec G711/G722
    let decodedData = AudioDecoder.decode(enc: data, frames: 160)

    // create PCMBuffer for audio data for playback
    let format = AVAudioFormat(standardFormatWithSampleRate: 8000, channels: 1)
    let buffer = AVAudioPCMBuffer(pcmFormat: format!, frameCapacity: 160)!
    buffer.frameLength = buffer.frameCapacity

    // TODO: now I have to copy the decodedData …
Run Code Online (Sandbox Code Playgroud)

audio streaming playback ios avaudioengine

5
推荐指数
2
解决办法
2611
查看次数

是否可以从 Python AWS Lambda 流式传输响应

AWS 中是否可以使用 Python Lambda 将 HTTP 响应流式传输回客户端?

用例是将一些数据从 Elasticsearch 中流式传输出来,将其转换为 CSV,然后将其直接流式传输回客户端(通过 API 网关)。

据我所知,这在Java(/sf/answers/2756749201/)中是可能的,在Node中也可能是可能的,但我不知道如何在Python中做到这一点。

python streaming amazon-web-services aws-lambda

5
推荐指数
1
解决办法
3725
查看次数

如何在 Golang GRPC 中为持久流(又名 pubsub)创建服务器

我正在构建需要以发布/订阅方式向所有订阅的消费者发送事件的服务,例如。向所有当前连接的客户端发送一个事件。

我使用 Protobuf 来实现以下原型定义:

service EventsService {
  rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}
Run Code Online (Sandbox Code Playgroud)

服务器和客户端都是用 Go 编写的。

我的问题是,当客户端启动连接时,流的寿命不长,例如。当服务器从ListenForEvents方法返回时:

service EventsService {
  rpc ListenForEvents (AgentProcess) returns (stream Event) {}
}
Run Code Online (Sandbox Code Playgroud)

然后客户端几乎立即收到EOF错误,这意味着服务器可能关闭了连接。

怎么做才能让客户端长期订阅服务器呢?主要问题是,当客户端调用服务器上的方法时,我可能没有任何内容可以发送给客户端ListenForEvents,这就是为什么我希望该流长期存在,以便以后能够发送消息。

streaming go grpc

5
推荐指数
1
解决办法
4477
查看次数

我们自己设置DRM许可证服务器是否可行

我们想要在内容中添加 DRM 加密,并想知道拥有自己的 DRM 服务器是否确实可行,还是应该使用第三方?另外,如果可行的话,我们可以实现所有 3 个 fairplay、playready、widevine 吗?

streaming drm fairplay playready widevine

5
推荐指数
1
解决办法
2465
查看次数

用于聊天应用程序的 GRPC 流或 WebSockets

我对 gRPC 的双向工作有点困惑。我找不到任何好的解释。有人可以说/描述 gRPC 双向流和 websockets 之间有什么区别吗?我可以仅使用高负载的 gRPC 创建功能齐全的聊天应用程序(服务器/客户端)吗?

streaming chat stream websocket grpc

5
推荐指数
0
解决办法
492
查看次数

Apache flink,以 S3 作为源,S3 作为接收器

是否可以通过 apache Flink 读取事件进入 S3 源存储桶并处理并将其下沉回其他 S3 存储桶?是否有一个特殊的连接器,或者我必须使用 Apache Flink 中提到的可用读取/保存示例?在这种情况下检查点是如何发生的,flink 是否自动跟踪从 S3 源存储桶读取的内容,或者是否需要构建自定义代码。flink 是否也保证 S3 源案例中的恰好一次处理。

streaming event-handling amazon-s3 apache-flink

5
推荐指数
1
解决办法
8759
查看次数

Rust 超级流媒体功能

我目前正在尝试通过 rust with hyper (0.13) 中的简单 HTTP get 请求来实现数据流。这个想法很简单。如果客户端发送请求,服务器每 5 秒以“块”形式响应,使连接始终保持打开状态。

我正在尝试从 futures 箱中实现特征 Stream

impl Stream for MonitorString {
    type Item = Result<String, serde_json::Error>;

    fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Result<String, serde_json::Error>>> {
        thread::sleep(Duration::from_secs(5)); 

        Poll::Ready(Some(self.data.to_string()))
    }
}
Run Code Online (Sandbox Code Playgroud)

然后建立一个响应,例如

type StreamingServiceResult = Result<Response<Body>, Box<dyn std::error::Error + Sync + Send>>;

pub async fn handle_request(_: Request<Body>, params: Params, _: Query) -> StreamingServiceResult {
     Ok(Response::builder()
        .header(hyper::header::CONTENT_TYPE, "application/json")
        .header(hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
        .header(hyper::header::TRANSFER_ENCODING, "chunked")
        .body(Body::wrap_stream(MonitorString::new(...)))
}
Run Code Online (Sandbox Code Playgroud)

当我向服务器发送请求时,它会打开连接并挂起。通过一些调试,我看到调用了 poll_next 函数,但仅当流没有更多内容可产生时才发送响应(poll::Ready(None) 由 poll_next 返回)并整体发送。

我可以看到我对 Stream …

streaming rust hyper

5
推荐指数
0
解决办法
1540
查看次数