标签: streaming

Kinesis Firehose 是 Kinesis Streams 的替代品吗?

Kinesis Firehose 以及 Kinesis Streams 用于按照 AWS 博客中提到的详细信息加载流数据。Firehose 不存在分片或维护的概念。在这种情况下,Kinesis Firehose 可以替代 Kinesis Streams 吗?

streaming amazon-web-services amazon-kinesis amazon-kinesis-firehose

3
推荐指数
1
解决办法
2340
查看次数

使用流查询时出现“连接被拒绝”异常

我正在尝试读取流数据输入,如下所示

object SocketReadExample {

    def main(args: Array[String]): Unit = {

      val sparkSession = SparkSession.builder
        .master("local")
        .appName("example")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .getOrCreate()
      //create stream from socket
      val socketStreamDf = sparkSession.readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 50050)
        .load()

      val consoleDataFrameWriter = socketStreamDf.writeStream
        .format("console")
        .outputMode(OutputMode.Append())

      val query = consoleDataFrameWriter.start()

      query.awaitTermination()
       }
   }
Run Code Online (Sandbox Code Playgroud)

为此我面临以下错误:

 Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Connection
 refused
 === Streaming Query ===
 Identifier: [id = 2bdde43c-319d-48fc-941a-e8d794294a1d, runId = 8b1fd51e-b610-497b-b903-d66367856302]
 Current Committed Offsets: {}
 Current Available Offsets: {}

 Current State: INITIALIZING
 Thread State: RUNNABLE
    at …
Run Code Online (Sandbox Code Playgroud)

streaming scala apache-spark

3
推荐指数
1
解决办法
2734
查看次数

Java 中的高效 Zip 文件流

问题

在我的服务中,用户可以下载非常大(5+ GB)的数据包作为 zip 文件。当前的实现定位数据包中的所有文件,创建一个新的 zip 文件,用文件的副本填充 zip 文件,然后将其流式传输给用户。

这对于较大的数据包来说不能很好地扩展,我正在尝试找到一种方法来提高此过程的效率。我在下面提出了一个建议的解决方案,但我没有提供内容的经验,并且希望获得有关解决此问题的最佳方法的专业见解。

尝试的解决方案

我认为最好的方法是不要将实际字节复制到 zip 文件中。相反,创建符号链接的 zip 文件,然后在流式传输内容时复制字节。我在传输过程中实际将字节复制到 zip 中时遇到了问题,我不知道这是否可行。

最终解决方案

我将下面 Alexey Ragozin 接受的答案实现到SpeedBagIt中,这是一个用于高效传输符合BagIt规范的 zip 文件的库。

java streaming performance download

3
推荐指数
1
解决办法
5353
查看次数

flink kafka生产者在检查点恢复时以一次模式发送重复消息

我正在写一个案例来测试 flink 两步提交,下面是概述。

在此输入图像描述

sink kafka曾经是kafka生产者。sink step是 mysql 接收器扩展two step commitsink compare是mysql的sinkextend two step commit,这个sink偶尔会抛出异常来模拟检查点失败。

当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka消费者将读取上次成功的偏移量,并且kafka生产者会产生消息,即使他在这个检查点失败之前已经完成了

在这种情况下如何避免重复消息?

感谢帮助。

环境

  • 弗林克1.9.1

  • 爪哇1.8

  • 卡夫卡2.11

卡夫卡生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                "flink_output",
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    }
                }, …
Run Code Online (Sandbox Code Playgroud)

streaming blink apache-flink flink-streaming

3
推荐指数
1
解决办法
1346
查看次数

Spring Webflux上传大图像文件并使用WebClient以流方式发送文件

我正在使用 spring webflux 功能风格。我想创建一个接受大图像文件的端点,并使用 webClient 以流方式将此文件发送到另一个服务。

所有文件处理都应该以流方式进行,因为我不想因为内存不足而导致我的应用程序崩溃。

有办法做到这一点吗?

streaming file-upload spring-boot spring-webflux

3
推荐指数
1
解决办法
4149
查看次数

使用 php 从 openai GPT-3 API 传输数据

我在使用 OpenAI API 时遇到了问题,基本上我想做的是流式传输从 openai API 响应流回的每个数据节点,并在从 API 调用流入时一次输出一个数据节点,但我不知道这是如何完成的,我研究了几个小时,但找不到任何关于如何使用 PHP 实现这一点的信息。

如何让我的代码在 API 流式传输数据时实时输出每个数据节点?

以下是我能想到的最好的,它在调用完成后立即输出所有数据,但它不会流入数据。

function openAI(){
  $OPENAI_API_KEY="API_KEY_GOES_HERE";
  $user_id="1";  //  users id optional
   
    $prompt="tell me what you can do for me.";
    $temperature=0.5;  // 1 adds complete randomness  0 no randomness 0.0
    $max_tokens=30;
 
         $data = array('model'=>'text-davinci-002',
              'prompt'=>$prompt,
              'temperature'=>$temperature,
              'max_tokens'=>$max_tokens,
              'top_p'=>1.0,
              'stream'=>TRUE,// stream back response
              'frequency_penalty'=>0.0,
              'presence_penalty'=>0.0,
               'user' => $user_id);

   $post_json= json_encode($data);
   $ch = curl_init();
   curl_setopt($ch, CURLOPT_URL, 'https://api.openai.com/v1/completions');
   curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
   curl_setopt($ch, CURLOPT_POST, 1);
   curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
   curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
   curl_setopt($ch, CURLOPT_POSTFIELDS, …
Run Code Online (Sandbox Code Playgroud)

php streaming openai-api

3
推荐指数
1
解决办法
5463
查看次数

在 FastAPI 中返回 StreamingResponse 时出现“AttributeError:encode”

我正在使用 Python 3.10 和 FastAPI0.92.0编写服务器发送事件 (SSE) 流 api。Python 代码如下所示:

from fastapi import APIRouter, FastAPI, Header

from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse

router = APIRouter()

@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
    auth_mode, auth_token = authorization.split(' ')
    if auth_token is None:
        return "Authorization token is missing"
    answer = chat_stream(q, auth_token)
    return StreamingResponse(answer, media_type="text/event-stream")
Run Code Online (Sandbox Code Playgroud)

这是chat_stream函数:

import openai

def chat_stream(question: str, key: str):
    openai.api_key = key
    # create a completion
    completion = openai.Completion.create(model="text-davinci-003",
                                          prompt=question,
                                          stream=True)
    return …
Run Code Online (Sandbox Code Playgroud)

streaming python-3.x server-sent-events fastapi openai-api

3
推荐指数
1
解决办法
4184
查看次数

如何从.NET应用程序启动默认媒体播放器?

我需要从我的c#.NET应用程序中的URL启动媒体文件.有没有办法在.NET中本地执行此操作?我不需要嵌入式播放器,我只需要默认播放器即可启动.我试过了

System.Diagnostics.Process.Start("File URL");
Run Code Online (Sandbox Code Playgroud)

但它会启动默认浏览器并下载文件,而不是尝试在WMP/VLC中播放它/无论默认媒体播放器是什么.有任何想法吗?

.net c# audio streaming mp3

2
推荐指数
1
解决办法
6379
查看次数

在网站上生成xslx文件的最佳方法是什么?可能有数百万行?

我的任务是编写一个解决方案来修复性能不佳的遗留excel文件生成器.

我需要生成的文件可能会变得非常大.也许多达一百万行,40-50列.我想如果可能的话我会直接流向用户,但我可能只需先将文件保存到磁盘然后为用户创建一个链接.

我的目标是进行性能测试,测试我是否可以生成一个1.500.000行和50列的xslx文件,每个单元格包含一个随机的10个字母的字符串... excel甚至可以处理这么大的文件?

注意:实际上生成的大多数文件永远不会超过300.000行,绝对最大值大约是950.000行,但我喜欢在压力测试时保证安全,因此1.5M行.

您对如何解决此任务有任何建议吗?我应该注意哪些组件?excel的局限性?

PS:如果我不必在服务器上安装Excel,我将不胜感激.

asp.net streaming excel

2
推荐指数
1
解决办法
1617
查看次数

使用libvlc将mp3流式传输到网络

如何使用libvlc获取MP3文件(或任何音频文件)并将其传输到网络,以便我可以连接iTunes或其他东西,像网络收音机一样收听?

C API示例是首选,但任何语言都可以.

streaming vlc libvlc

2
推荐指数
1
解决办法
7157
查看次数