小编Pav*_*its的帖子

Go + Apache Beam GCP 数据流:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1

我使用Go SDK和 Apache Beam 来构建一个简单的数据流管道,该管道将从查询中获取数据并使用以下代码将数据发布到 pub/sub:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "gitlab.com/bq-to-pubsub/infra/env"
    "gitlab.com/bq-to-pubsub/sources"
    "gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
    flag.Parse()
    ctx := context.Background()
    beam.Init()
    log.Info(ctx, "Creating new pipeline")
    pipeline, scope := beam.NewPipelineWithRoot()
    project := gcpopts.GetProject(ctx)

    ppData := pp.Query(scope, project)
    ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
    pubsubio.Write(scope, "project", "topic", ppMessages)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

Run Code Online (Sandbox Code Playgroud)

当我的管道在 Google Cloud Dataflow 上运行时,出现以下错误: …

go google-cloud-pubsub google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
534
查看次数

从 8 个并行流升级的 Java 11 抛出 ClassNotFoundException

我将 Spring Boot 应用程序从 Java 8 和 tomcat 8 升级到了 java 11 和 tomcat 9。除了我在列表上使用并行流的部分外,一切似乎都运行良好。

list.addAll(items
                .parallelStream()
                .filter(item -> !SomeFilter.isOk(item.getId()))
                .map(logic::getSubItem)
                .collect(Collectors.toList()));
Run Code Online (Sandbox Code Playgroud)

前一部分代码过去在 Java 8 和 Tomcat 8 上工作得很好,但在Java 9 改变了如何使用 Fork/Join 公共池线程加载类之后,将系统类加载器作为它们的线程上下文类加载器返回。

我知道在后台并行流使用 ForkJoinPool 并且我创建了一个自定义 bean 类,但仍然没有被应用程序使用。很可能是因为它们可能是在这个 bean 之前创建的。

@Bean
public ForkJoinPool myForkJoinPool() {
    return new ForkJoinPool(threadPoolSize, makeFactory("APP"), null, false);
}

private ForkJoinPool.ForkJoinWorkerThreadFactory makeFactory(String prefix) {
    return pool -> {
        final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        worker.setName(prefix + worker.getPoolIndex());
        worker.setContextClassLoader(Application.class.getClassLoader());
        return worker;
    };
}
Run Code Online (Sandbox Code Playgroud)

最后,我还尝试将它包装在我的 ForkJoinPool 实例周围,但它是异步完成的,我不想这样做。我也不想使用提交和获取,因为这意味着我必须用 try/catch …

java parallel-processing spring java-stream java-11

7
推荐指数
1
解决办法
352
查看次数