我有一个登录Stackdriver,记录每个请求进入我的api并失败,我想写一个脚本来指望每个错误消息出现的次数.问题是,Stackdriver V2中的导出功能只允许我接收即将发生的错误消息,但我只关心已经存在于日志中的日志条目.有没有办法从Stackdriver下载完整的日志?
我遇到了 GCP pubsub 的问题,即在几秒钟内发布数千条消息时丢失了一小部分数据。
我正在message_id从 pubsub 和一个session_id唯一的发布端和接收端的每条消息进行记录,我看到的结果是接收端的某些消息具有相同session_id但不同的message_id. 此外,还丢失了一些消息。
例如,在一项测试中,我向 pubsub 发送了 5,000 条消息,并且恰好收到了 5,000 条消息,其中 8 条消息丢失。日志丢失消息如下所示:
MISSING sessionId:sessionId: 731 (missing in log from pull request, but present in log from Flask API)
messageId FOUND: messageId:108562396466545
API: 200 **** sessionId: 731, messageId:108562396466545 ******(Log from Flask API)
Pubsub: sessionId: 730, messageId:108562396466545(Log from pull request)
Run Code Online (Sandbox Code Playgroud)
重复项看起来像:
======= Duplicates FOUND on sessionId: 730=======
sessionId: 730, messageId:108562396466545
sessionId: 730, messageId:108561339282318
(both are logs from pull request) …Run Code Online (Sandbox Code Playgroud) google-cloud-messaging google-cloud-platform google-cloud-pubsub
我正在编写一段数据流转换,用于org.apache.beam.sdk.state.MapState实现缓存功能。然而,在引入后MapState,单元测试开始出现功能障碍。异常说:java.lang.UnsupportedOperationException: Parameter StateParameter{referent=StateDeclaration{id=cache, field=private final org.apache.beam.sdk.state.StateSpec xxxxFn.cache, stateType=org.apache.beam.sdk.state.MapState<java.lang.String, object>}} not supported by DoFnTester
那么,如果DoFnTester不再是一种选择,那么当前测试使用 MapState 的 DoFn 的最佳实践是什么?
PS 我目前使用的是 Beam 2.0.0,无法升级到 2.2.0,因为https://issues.apache.org/jira/browse/BEAM-3693
这是完整的跟踪:
java.lang.UnsupportedOperationException: Parameter StateParameter{referent=StateDeclaration{id=cache, field=private final org.apache.beam.sdk.state.StateSpec xxxxFn.cache, stateType=org.apache.beam.sdk.state.MapState<java.lang.String, object>}} not supported by DoFnTester
at org.apache.beam.sdk.transforms.DoFnTester$5.dispatchDefault(DoFnTester.java:725)
at org.apache.beam.sdk.transforms.DoFnTester$5.dispatchDefault(DoFnTester.java:710)
at org.apache.beam.sdk.transforms.reflect.DoFnSignature$Parameter$Cases$WithDefault.dispatch(DoFnSignature.java:255)
at org.apache.beam.sdk.transforms.reflect.DoFnSignature$Parameter.match(DoFnSignature.java:193)
at org.apache.beam.sdk.transforms.DoFnTester.<init>(DoFnTester.java:709)
at org.apache.beam.sdk.transforms.DoFnTester.of(DoFnTester.java:92)
at xxxxFnTest.testNormalRun(GetPredictionsFnTest.java:50)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) …Run Code Online (Sandbox Code Playgroud) java google-cloud-platform google-cloud-dataflow apache-beam
我正在尝试优化从PubSubIO中提取消息的管道,并将这些消息发送到第三方API.我有一个有趣的观察结果是,如果我在之后放置一个GroupBy"Degroup"变换PubSubIO.read,那么管道的吞吐量会显着增加.我添加了GroupByjust以防止融合优化,现在我想知道在给定的管道中如何合并转换.
融合后如何找出管道的最佳方法是什么?