Go + Apache Beam GCP 数据流:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1

Pav*_*its 8 go google-cloud-pubsub google-cloud-dataflow apache-beam

我使用Go SDK和 Apache Beam 来构建一个简单的数据流管道,该管道将从查询中获取数据并使用以下代码将数据发布到 pub/sub:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "gitlab.com/bq-to-pubsub/infra/env"
    "gitlab.com/bq-to-pubsub/sources"
    "gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
    flag.Parse()
    ctx := context.Background()
    beam.Init()
    log.Info(ctx, "Creating new pipeline")
    pipeline, scope := beam.NewPipelineWithRoot()
    project := gcpopts.GetProject(ctx)

    ppData := pp.Query(scope, project)
    ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
    pubsubio.Write(scope, "project", "topic", ppMessages)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

Run Code Online (Sandbox Code Playgroud)

当我的管道在 Google Cloud Dataflow 上运行时,出现以下错误:

工作流程失败。原因:S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed.,作业失败,因为工作项已失败 4 次。查看之前的日志条目,了解 4 次失败中每一次失败的原因。有关更多信息,请参阅https://cloud.google.com/dataflow/docs/guides/common-errors。已在这些工作线程上尝试该工作项:pp10112132-vhzf-harness-p8v0 根本原因:无法找到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。,pp10112132-vhzf-harness-p8v0 根本原因:无法找到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。,pp10112132-vhzf-harness-p8v0 根本原因:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。,pp10112132-vhzf- harness-p8v0 根本原因:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。

我已阅读此主题,但我不确定它是如何解决的。

任何想法?

小智 3

作业是在流模式还是批处理模式下运行?我猜是批处理模式。可能是用于批处理模式的 Dataflow 内部运行器未链接到 pub sub 接收器。

不幸的是,目前 Go SDK 没有提供本地“后备”来写入 pubsub 供批处理运行器使用。

也就是说,如果您使用标准 Go 包编写自己的 DoFn 来写入 PubSub,那么您应该很容易解锁。https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing

大致应该如下所示。

var (
  // Assuming everything is one project
  clientOnce sync.Once
  pubSubClient pubsub.Client
)

type PubSubSinkFn struct{
  Project, Topic string // Whatever configuration you need

  client pubsub.Client  // Client is safe to use on multiple goroutines
  batch []*myMessages   // per bundle batches.
}

func (fn *PubSubSinkFn) Setup(ctx context.Context) {
   clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
   fn.client = pubSubClient
}

func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
  fn.batch = append(fn.batch, v)
  if len(fn.batch) > batchSize { // or whatever criteria you want
     fn.publishBatch()
  }
}

func (fn *PubSubSinkFn) FinishBundle() {
  fn.publishBatch()
}

func (fn *PubSubSinkFn) publishBatch() {
  // use fn.client to publish the batch
  fn.batch = nil
}

// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)
Run Code Online (Sandbox Code Playgroud)