Pav*_*its 8 go google-cloud-pubsub google-cloud-dataflow apache-beam
我使用Go SDK和 Apache Beam 来构建一个简单的数据流管道,该管道将从查询中获取数据并使用以下代码将数据发布到 pub/sub:
package main
import (
"context"
"flag"
"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
"github.com/apache/beam/sdks/go/pkg/beam/log"
"github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
"github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
"gitlab.com/bq-to-pubsub/infra/env"
"gitlab.com/bq-to-pubsub/sources"
"gitlab.com/bq-to-pubsub/sources/pp"
)
func main() {
flag.Parse()
ctx := context.Background()
beam.Init()
log.Info(ctx, "Creating new pipeline")
pipeline, scope := beam.NewPipelineWithRoot()
project := gcpopts.GetProject(ctx)
ppData := pp.Query(scope, project)
ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
pubsubio.Write(scope, "project", "topic", ppMessages)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}
}
Run Code Online (Sandbox Code Playgroud)
当我的管道在 Google Cloud Dataflow 上运行时,出现以下错误:
工作流程失败。原因:S01:Source pp/bigquery.Query/Impulse+Source pp/bigquery.Query/bigqueryio.queryFn+pp.ToByteArray+pubsubio.Write/External failed.,作业失败,因为工作项已失败 4 次。查看之前的日志条目,了解 4 次失败中每一次失败的原因。有关更多信息,请参阅https://cloud.google.com/dataflow/docs/guides/common-errors。已在这些工作线程上尝试该工作项:pp10112132-vhzf-harness-p8v0 根本原因:无法找到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。,pp10112132-vhzf-harness-p8v0 根本原因:无法找到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。,pp10112132-vhzf-harness-p8v0 根本原因:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。,pp10112132-vhzf- harness-p8v0 根本原因:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1。
我已阅读此主题,但我不确定它是如何解决的。
任何想法?
小智 3
作业是在流模式还是批处理模式下运行?我猜是批处理模式。可能是用于批处理模式的 Dataflow 内部运行器未链接到 pub sub 接收器。
不幸的是,目前 Go SDK 没有提供本地“后备”来写入 pubsub 供批处理运行器使用。
也就是说,如果您使用标准 Go 包编写自己的 DoFn 来写入 PubSub,那么您应该很容易解锁。https://pkg.go.dev/cloud.google.com/go/pubsub#hdr-Publishing
大致应该如下所示。
var (
// Assuming everything is one project
clientOnce sync.Once
pubSubClient pubsub.Client
)
type PubSubSinkFn struct{
Project, Topic string // Whatever configuration you need
client pubsub.Client // Client is safe to use on multiple goroutines
batch []*myMessages // per bundle batches.
}
func (fn *PubSubSinkFn) Setup(ctx context.Context) {
clientOnce.Do (... ) // create the client with the sync.Once so it can be shared by all bundles
fn.client = pubSubClient
}
func (fn *PubSubSinkFn) ProcessElement(ctx context.Context, v *myMessage) {
fn.batch = append(fn.batch, v)
if len(fn.batch) > batchSize { // or whatever criteria you want
fn.publishBatch()
}
}
func (fn *PubSubSinkFn) FinishBundle() {
fn.publishBatch()
}
func (fn *PubSubSinkFn) publishBatch() {
// use fn.client to publish the batch
fn.batch = nil
}
// When constructing your pipeline
beam.ParDo0(s, &PubSubSinkFn{Project: "foo", Topic: "bar"}, messages)
Run Code Online (Sandbox Code Playgroud)