IOUtils.closeQuietly 似乎已被弃用,取而代之的是 TryWithResources,但是当我需要关闭异步事件侦听器中的资源时,它如何帮助我,就像我使用 AmazonS3 和 TransferManager 上传文件时那样?
final String key = rs.getString("id");
final InputStream data = rs.getBinaryStream("data");
final long length = rs.getLong("length");
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(length);
Upload upload = transferManager.upload(s3bucketName, key, data, objectMetadata);
upload.addProgressListener((com.amazonaws.event.ProgressListener) p -> {
switch (p.getEventType()) {
case TRANSFER_COMPLETED_EVENT:
case TRANSFER_FAILED_EVENT:
case TRANSFER_CANCELED_EVENT:
IOUtils.closeQuietly(data);
default:
break;
}
});
Run Code Online (Sandbox Code Playgroud) 我正在尝试让我的应用程序响应 SSE 服务器发送的事件流。该流会出现多种类型的事件,并且仅当我收到的事件类型与该 UI 元素关联时,应用程序的某些部分才会更新。
示例 json 流响应(例如,如果我收到心跳,我不想更改 UI):
event:heartbeat
data:{"type":"heartbeat"}
event:gwy
data:{"a":532289,"lastUpdatedDateInMilliseconds":1587490669503,"t":1587490669503,"g":12479,"api":0,"cellSignalStrength":15}
event:gwy
data:{"a":532289,"lastUpdatedDateInMilliseconds":1587490694685,"t":1587490694685,"g":12479,"api":0,"cellSignalStrength":15}
Run Code Online (Sandbox Code Playgroud)
此数据的格式是全局数据变量的子集(事件应替换全局变量的该部分)。
非常感谢任何指示或资源。
例如,假设我已经val fs2Stream: Stream[IO, Byte]并且需要调用一些需要java.io.InputStream.
我想我对 FS2 还太陌生,但我似乎找不到答案。我尝试使用fs2.io.toInputStreamandfs2.io.readInputStream但我不知道如何提供一些所需的参数。我已经在万能的 Google 上搜索了答案,但自从大多数人上次寻找答案以来,API 似乎已经发生了变化。
我该如何去做类似以下的事情?
def myFunc(data: fs2.Stream[IO, Byte]): InputStream[Byte] = someMagicalFunction(data)
Run Code Online (Sandbox Code Playgroud) 我有遵循 archunit 规则的应用程序,我得到:
NO_CLASSES_SHOULD_ACCESS_STANDARD_STREAMS
失败的规则 - 这意味着我无法使用标准 Java I/O 流。但我可以用什么来代替呢?
如何使用其他 Java 方法而不是标准流来避免这种架构规则?因为我的拱门规则失败了
大家都在这儿!所以基本上这就是我想要实现的目标:
我有一个大约 3 分钟长的静音视频。我有一个 mp3 格式的音轨列表(一个文件夹中有 40 首歌曲,每首歌曲持续时间 2 到 6 分钟)
我希望这个视频自动循环播放,从播放列表中提取歌曲并将它们一首一首地注入到视频中。每次一首歌曲结束时,列表中的下一首歌曲就会立即开始播放。视频继续播放,不关心曲目的持续时间。
我认为这是在 YouTube 上以 24/7 模式播放带有视频背景的广播的第一步,并且能够将其他曲目添加到播放列表中,而无需停止翻译。
我的问题是,我是 FFmpeg 的新手,如果有任何关于开始研究哪个 FFMpeg 主题以实现我的目标的建议,我将不胜感激
我想将数据流式传输到 BigQuery 中,并且正在考虑使用 PubSub + Cloud Functions,因为不需要转换(至少目前如此),并且使用 Cloud Data Flow 感觉只是将行插入到表中有点过头了。我说得对吗?
数据使用 Python 脚本从 GCP VM 流式传输到 PubSub,其格式如下:
{'SEGMENT':'datetime':'2020-12-05 11:25:05.64684','values':(2568.025,2567.03)}
Run Code Online (Sandbox Code Playgroud)
BigQuery 架构是datetime:timestamp, value_A: float, value_B: float.
我对这一切的疑问是:
a) 我是否需要将其作为 json/字典推送到 BigQuery 中,所有值都作为字符串,还是必须使用表的数据类型?
BQ.insert_rows_jsonb) 使用和之间有什么区别,BQ.load_table_from_json我应该使用哪一个来完成这项任务?
编辑:
我想要获取的实际上是一些资产的市场数据。假设大约 28 种金融工具并捕捉它们的所有价格变动。平均每天,每个工具有约 60.k 个报价,因此我们谈论的是每月约 3360 万次调用。(目前)需要的是将它们插入到表中以供进一步分析。我目前不确定是否应该执行真正的流处理或每批加载。由于该项目尚未进行分析,我认为不需要数据流,但应该使用 PubSub,因为它可以在时机成熟时更轻松地扩展到数据流。这是我第一次实现流管道,我正在使用我通过课程和阅读学到的所有知识。如果我的方法错误,请纠正我:)。
例如,我绝对想做的是,当一个价格变动与第 n 个价格变动之间的价格差异为 10 时,对另一个表执行另一次插入。为此,我应该使用数据流还是云函数方法仍然有效吗?因为这就像一个触发条件。基本上,触发器类似于:
if price difference >= 10:
process all these ticks
insert the results in this table
Run Code Online (Sandbox Code Playgroud)
但我不确定如何实现这个触发器。
stream python-3.x google-bigquery google-cloud-pubsub google-cloud-functions
我正在尝试创建一个自动完成功能,为每次输入从 API 接收数据,现在我每次都会显示数据,但首先我总是在每种类型上出现此错误。
您在需要流的地方提供了“未定义”。您可以提供 Observable、Promise、Array 或 Iterable。
即使我将数据订阅到变量中,也没有任何反应。这是错误的图像,您将看到显示的数据
这是我的 app.component.ts
ngOnInit() {
this.results = this.searchText.valueChanges.pipe(
startWith(''),
// delay emits
debounceTime(300),
switchMap(value => {
if (value !== '' && value !== undefined) {
this.lookup(value).subscribe((value: any) => {
console.log('data sub:', value);
});
} else {
return of(null);
}
})
);
}
lookup(value: string): Observable<any> {
return this.appService.autoComplete(value.toLowerCase()).pipe(
map(results => results.hits.hits)
);
}
Run Code Online (Sandbox Code Playgroud)
这是我的服务。ts
public autoComplete(name: string): Observable<Response> {
const params = new HttpParams()
.set('name', name);
return this.httpClient.get<Response>(this.host, { params});
}
Run Code Online (Sandbox Code Playgroud)
这也是我的 html: …
关闭streamSink后,还需要取消监听吗?
找到这个答案:Dart:我必须取消 Stream 订阅并关闭 StreamSinks 吗?
一般来说,当您听完该 Stream 后,无论出于何种原因,您都应该关闭订阅
那么,如果我不取消订阅,它们就会永远保持开放状态吗?垃圾收集器不会处理它们吗?
StreamController<int> controller = StreamController<int>();
Stream stream = controller.stream;
StreamSubscription subscription = stream.listen((value) {
print('Value from controller: $value');
})
..onDone(() => print('done'));
// prints : Value from controller: 1
controller.add(1);
// prints : done
controller.close();
// still listening to closed stream
// no error - cancels successfully
Future.delayed(Duration(seconds: 2), () => subscription.cancel());
Run Code Online (Sandbox Code Playgroud) 我正在努力寻找一种在 Flutter 中以动态间隔时间定期发出流的方法。我不确定,这真的可能吗?一种解决方法可能是取消旧的定期流并使用新的时间间隔重新初始化它,但我的 asyncMap 定期流没有取消选项。我可以使用具有取消方法的stream.listen,但我特意需要asyncMap将Future事件转换为流。在这种情况下,我能做什么,请给我建议。
我的代码片段 -
int i = 0;
int getTimeDiffForPeriodicEvent() {
i++;
return (_timeDiffBetweenSensorCommands * commandList.length + 1) * i;
}
StreamBuilder(
stream: Stream.periodic(
Duration(seconds: maskBloc.getTimeDiffForPeriodicEvent()))
.asyncMap((_) async => maskBloc.getDataFromMask()),
builder: (context, snapshot) {
return Container();
},
);
Run Code Online (Sandbox Code Playgroud) 从长远来看,下面的代码片段总是会导致内存不足错误,尤其是在读取非常庞大的文件/内容时。
有没有另一种方法可以重写它,特别是使用流?
我在这里看到了一种将字节数组转换为十六进制字符串的方法:Effective way to get hex string from a byte array using lambdas and Streams
public static byte[] hexStringToBytes(String hexString) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Hex string to convert to byte[] " + hexString);
}
byte[] buf = new byte[hexString.length() / 2];
String twoDigitHexToConvertToByte;
for (int i = 0; i < buf.length; i++) {
twoDigitHexToConvertToByte = extractPairFromStringBasedOnIndex(hexString, i);
parseStringToBytesAndStoreInArrayOnIndex(twoDigitHexToConvertToByte, buf, i);
}
return buf;
}
private static void parseStringToBytesAndStoreInArrayOnIndex(String twoDigitHexToConvertToByte, byte[] buf, int i) {
try {
buf[i] = …Run Code Online (Sandbox Code Playgroud)