Kinesis Firehose 以及 Kinesis Streams 用于按照 AWS 博客中提到的详细信息加载流数据。Firehose 不存在分片或维护的概念。在这种情况下,Kinesis Firehose 可以替代 Kinesis Streams 吗?
streaming amazon-web-services amazon-kinesis amazon-kinesis-firehose
我正在尝试读取流数据输入,如下所示
object SocketReadExample {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate()
//create stream from socket
val socketStreamDf = sparkSession.readStream
.format("socket")
.option("host", "localhost")
.option("port", 50050)
.load()
val consoleDataFrameWriter = socketStreamDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
val query = consoleDataFrameWriter.start()
query.awaitTermination()
}
}
Run Code Online (Sandbox Code Playgroud)
为此我面临以下错误:
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Connection
refused
=== Streaming Query ===
Identifier: [id = 2bdde43c-319d-48fc-941a-e8d794294a1d, runId = 8b1fd51e-b610-497b-b903-d66367856302]
Current Committed Offsets: {}
Current Available Offsets: {}
Current State: INITIALIZING
Thread State: RUNNABLE
at …Run Code Online (Sandbox Code Playgroud) 在我的服务中,用户可以下载非常大(5+ GB)的数据包作为 zip 文件。当前的实现定位数据包中的所有文件,创建一个新的 zip 文件,用文件的副本填充 zip 文件,然后将其流式传输给用户。
这对于较大的数据包来说不能很好地扩展,我正在尝试找到一种方法来提高此过程的效率。我在下面提出了一个建议的解决方案,但我没有提供内容的经验,并且希望获得有关解决此问题的最佳方法的专业见解。
我认为最好的方法是不要将实际字节复制到 zip 文件中。相反,创建符号链接的 zip 文件,然后在流式传输内容时复制字节。我在传输过程中实际将字节复制到 zip 中时遇到了问题,我不知道这是否可行。
我将下面 Alexey Ragozin 接受的答案实现到SpeedBagIt中,这是一个用于高效传输符合BagIt规范的 zip 文件的库。
我正在写一个案例来测试 flink 两步提交,下面是概述。
sink kafka曾经是kafka生产者。sink step是 mysql 接收器扩展two step commit。sink compare是mysql的sinkextend two step commit,这个sink偶尔会抛出异常来模拟检查点失败。
当检查点失败并恢复时,我发现mysql两步提交可以正常工作,但是kafka消费者将读取上次成功的偏移量,并且kafka生产者会产生消息,即使他在这个检查点失败之前已经完成了。
在这种情况下如何避免重复消息?
感谢帮助。
环境:
弗林克1.9.1
爪哇1.8
卡夫卡2.11
卡夫卡生产者代码:
dataStreamReduce.addSink(new FlinkKafkaProducer<>(
"flink_output",
new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
UUID uuid = UUID.randomUUID();
JSONObject jsonObject = new JSONObject();
jsonObject.put("uuid", uuid.toString());
jsonObject.put("key1", element.f0);
jsonObject.put("key2", element.f1);
jsonObject.put("key3", element.f2);
jsonObject.put("indicate", element.f3);
return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
}
}, …Run Code Online (Sandbox Code Playgroud) 我正在使用 spring webflux 功能风格。我想创建一个接受大图像文件的端点,并使用 webClient 以流方式将此文件发送到另一个服务。
所有文件处理都应该以流方式进行,因为我不想因为内存不足而导致我的应用程序崩溃。
有办法做到这一点吗?
我在使用 OpenAI API 时遇到了问题,基本上我想做的是流式传输从 openai API 响应流回的每个数据节点,并在从 API 调用流入时一次输出一个数据节点,但我不知道这是如何完成的,我研究了几个小时,但找不到任何关于如何使用 PHP 实现这一点的信息。
如何让我的代码在 API 流式传输数据时实时输出每个数据节点?
以下是我能想到的最好的,它在调用完成后立即输出所有数据,但它不会流入数据。
function openAI(){
$OPENAI_API_KEY="API_KEY_GOES_HERE";
$user_id="1"; // users id optional
$prompt="tell me what you can do for me.";
$temperature=0.5; // 1 adds complete randomness 0 no randomness 0.0
$max_tokens=30;
$data = array('model'=>'text-davinci-002',
'prompt'=>$prompt,
'temperature'=>$temperature,
'max_tokens'=>$max_tokens,
'top_p'=>1.0,
'stream'=>TRUE,// stream back response
'frequency_penalty'=>0.0,
'presence_penalty'=>0.0,
'user' => $user_id);
$post_json= json_encode($data);
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, 'https://api.openai.com/v1/completions');
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_POSTFIELDS, …Run Code Online (Sandbox Code Playgroud) 我正在使用 Python 3.10 和 FastAPI0.92.0编写服务器发送事件 (SSE) 流 api。Python 代码如下所示:
from fastapi import APIRouter, FastAPI, Header
from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse
router = APIRouter()
@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
auth_mode, auth_token = authorization.split(' ')
if auth_token is None:
return "Authorization token is missing"
answer = chat_stream(q, auth_token)
return StreamingResponse(answer, media_type="text/event-stream")
Run Code Online (Sandbox Code Playgroud)
这是chat_stream函数:
import openai
def chat_stream(question: str, key: str):
openai.api_key = key
# create a completion
completion = openai.Completion.create(model="text-davinci-003",
prompt=question,
stream=True)
return …Run Code Online (Sandbox Code Playgroud) 我需要从我的c#.NET应用程序中的URL启动媒体文件.有没有办法在.NET中本地执行此操作?我不需要嵌入式播放器,我只需要默认播放器即可启动.我试过了
System.Diagnostics.Process.Start("File URL");
Run Code Online (Sandbox Code Playgroud)
但它会启动默认浏览器并下载文件,而不是尝试在WMP/VLC中播放它/无论默认媒体播放器是什么.有任何想法吗?
我的任务是编写一个解决方案来修复性能不佳的遗留excel文件生成器.
我需要生成的文件可能会变得非常大.也许多达一百万行,40-50列.我想如果可能的话我会直接流向用户,但我可能只需先将文件保存到磁盘然后为用户创建一个链接.
我的目标是进行性能测试,测试我是否可以生成一个1.500.000行和50列的xslx文件,每个单元格包含一个随机的10个字母的字符串... excel甚至可以处理这么大的文件?
注意:实际上生成的大多数文件永远不会超过300.000行,绝对最大值大约是950.000行,但我喜欢在压力测试时保证安全,因此1.5M行.
您对如何解决此任务有任何建议吗?我应该注意哪些组件?excel的局限性?
PS:如果我不必在服务器上安装Excel,我将不胜感激.
如何使用libvlc获取MP3文件(或任何音频文件)并将其传输到网络,以便我可以连接iTunes或其他东西,像网络收音机一样收听?
C API示例是首选,但任何语言都可以.
streaming ×10
openai-api ×2
.net ×1
apache-flink ×1
apache-spark ×1
asp.net ×1
audio ×1
blink ×1
c# ×1
download ×1
excel ×1
fastapi ×1
file-upload ×1
java ×1
libvlc ×1
mp3 ×1
performance ×1
php ×1
python-3.x ×1
scala ×1
spring-boot ×1
vlc ×1