我使用Go SDK和 Apache Beam 来构建一个简单的数据流管道,该管道将从查询中获取数据并使用以下代码将数据发布到 pub/sub:
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"gitlab.com/bq-to-pubsub/infra/env"
"gitlab.com/bq-to-pubsub/sources"
"gitlab.com/bq-to-pubsub/sources/pp"
)
func main() {
flag.Parse()
ctx := context.Background()
beam.Init()
log.Info(ctx, "Creating new pipeline")
pipeline, scope := beam.NewPipelineWithRoot()
project := gcpopts.GetProject(ctx)
ppData := pp.Query(scope, project)
ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
pubsubio.Write(scope, "project", "topic", ppMessages)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
Run Code Online (Sandbox Code Playgroud)
当我的管道在 Google Cloud Dataflow 上运行时,出现以下错误: …
我将 Spring Boot 应用程序从 Java 8 和 tomcat 8 升级到了 java 11 和 tomcat 9。除了我在列表上使用并行流的部分外,一切似乎都运行良好。
list.addAll(items
.parallelStream()
.filter(item -> !SomeFilter.isOk(item.getId()))
.map(logic::getSubItem)
.collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)
前一部分代码过去在 Java 8 和 Tomcat 8 上工作得很好,但在Java 9 改变了如何使用 Fork/Join 公共池线程加载类之后,将系统类加载器作为它们的线程上下文类加载器返回。
我知道在后台并行流使用 ForkJoinPool 并且我创建了一个自定义 bean 类,但仍然没有被应用程序使用。很可能是因为它们可能是在这个 bean 之前创建的。
@Bean
public ForkJoinPool myForkJoinPool() {
return new ForkJoinPool(threadPoolSize, makeFactory("APP"), null, false);
}
private ForkJoinPool.ForkJoinWorkerThreadFactory makeFactory(String prefix) {
return pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName(prefix + worker.getPoolIndex());
worker.setContextClassLoader(Application.class.getClassLoader());
return worker;
};
}
Run Code Online (Sandbox Code Playgroud)
最后,我还尝试将它包装在我的 ForkJoinPool 实例周围,但它是异步完成的,我不想这样做。我也不想使用提交和获取,因为这意味着我必须用 try/catch …