标签: apache-beam

编码器问题Apache Beam和CombineFn

我们正在使用Apache Beam和DirectRunner作为跑步者构建管道.我们目前正在尝试一个简单的管道,我们:

  1. 从Google Cloud Pub/Sub中提取数据(目前使用模拟器在本地运行)
  2. 反序列化为Java对象
  3. 使用固定窗口1分钟的窗口事件
  4. 使用自定义CombineFn将这些窗口从事件转换为事件列表来组合这些窗口.

管道代码:

pipeline
.apply(PubsubIO.<String>read().topic(options.getTopic()).withCoder(StringUtf8Coder.of()))

.apply("ParseEvent", ParDo.of(new ParseEventFn()))

.apply("WindowOneMinute",Window.<Event>into(FixedWindows.of(Duration.standardMinutes(1))))              

.apply("CombineEvents", Combine.globally(new CombineEventsFn()));
Run Code Online (Sandbox Code Playgroud)

ParseEvent函数:

    static class ParseEventFn extends DoFn<String, Event> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String json = c.element();
            c.output(gson.fromJson(json, Event.class));
        }
    }
Run Code Online (Sandbox Code Playgroud)

CombineEvents功能:

public static class CombineEventsFn extends CombineFn<Event, CombineEventsFn.Accum, EventListWrapper> {
        public static class Accum {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }

        @Override
        public Accum createAccumulator() {
            return new Accum();
        }

        @Override
        public Accum addInput(Accum accumulator, Event …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-platform google-cloud-dataflow apache-beam

8
推荐指数
2
解决办法
2864
查看次数

Apache Beam - 使用无界PCollection进行集成测试

我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见

有关我们管道的详情:

  • 我们使用PubsubIO我们的数据源(无界PCollection)
  • 中间变换包括自定义CombineFn和非常简单的窗口/触发策略
  • 我们最终的变换JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j

目前的测试方法:

  • 在运行测试的计算机上运行Google Cloud的Pub/Sub模拟器
  • 构建内存中的Neo4j数据库并将其URI传递给我们的管道选项
  • 通过调用运行管道 OurPipeline.main(TestPipeline.convertToArgs(options)
  • 使用Google Cloud的Java Pub/Sub客户端库将消息发布到测试主题(使用Pub/Sub模拟器),该主题PubsubIO将从
  • 数据应该流经管道并最终命中我们的内存中的Neo4j实例
  • 在Neo4j中对这些数据的存在做出简单的断言

这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.

我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunnerpipeline.run()( pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.

所以,我有几个问题:

1)有没有办法运行管道然后稍后手动停止?

2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.

3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.

如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!

java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
961
查看次数

梁json解析

我试图在Apache Beam代码中读取和解析JSON文件.

PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("/Users/xyz/eclipse-workspace/beam-project/myfirst.json"));
System.out.println("lines: " + lines);
Run Code Online (Sandbox Code Playgroud)

下面是我需要解析的示例JSON testdata:myfirst.json

{  
   “testdata":{  
      “siteOwner”:”xxx”,
      “siteInfo”:{  
         “siteID”:”id_member",
         "siteplatform”:”web”, 
         "siteType”:”soap”,
         "siteURL”:”www”
      }
   }
}
Run Code Online (Sandbox Code Playgroud)

有人可以指导如何testdata从上面的JSON文件解析和获取内容,然后我需要使用Beam流式传输数据?

java json apache-beam

8
推荐指数
1
解决办法
1706
查看次数

数据流管道-“处理在步骤&lt;STEP_NAME&gt;中停留了至少&lt;TIME&gt;,而没有在状态完成中输出或完成...”

我的团队开发的数据流管道突然开始卡住,停止处理事件。他们的工作日志充满警告消息,指出一个特定步骤被卡住了。奇怪的是,失败的步骤是不同的,一个是BigQuery输出,另一个是Cloud Storage输出。

以下是我们收到的日志消息:

对于BigQuery输出:

Processing stuck in step <STEP_NAME>/StreamingInserts/StreamingWriteTables/StreamingWrite for at least <TIME> without outputting or completing in state finish
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
  at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:765)
  at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:829)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:131)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:103)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

对于云存储输出:

Processing stuck in step <STEP_NAME>/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least <TIME> without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
  at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:421)
  at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
  at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1007)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:726)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

所有应用程序都已被排干并重新部署,但是一段时间(3到4个小时)后发生了相同的事情。其中一些已经运行了40多个天,而他们突然进入了代码库,而没有对代码进行任何更改。

我想寻求帮助以了解此问题的原因。这些是存在这些问题的某些Dataflow作业的以下ID:

卡在BigQuery输出中: …

google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1665
查看次数

数据流上的 grpc StatusRuntimeException

我有一个数据流管道,我在其中使用 pubsub 消息,处理它们,然后发布到 pubsub。

每当我有太多计算(即我增加每条消息的处理量)时,我都会收到异常。:java.util.concurrent.ExecutionException:org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException:CANCELLED:在接收半关闭之前取消

是什么导致了这个错误?我怎样才能避免它?完整的堆栈跟踪:

org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status.asRuntimeException(Status.java:517)
        org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:335)
        org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:98)
        org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
        org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.start(RegisterAndProcessBundleOperation.java:254)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-platform google-cloud-pubsub apache-beam

8
推荐指数
0
解决办法
550
查看次数

Go + Apache Beam GCP 数据流:找不到 pubsub 的接收器,检查接收器库是否指定alwayslink = 1

我使用Go SDK和 Apache Beam 来构建一个简单的数据流管道,该管道将从查询中获取数据并使用以下代码将数据发布到 pub/sub:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/pubsubio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
    "gitlab.com/bq-to-pubsub/infra/env"
    "gitlab.com/bq-to-pubsub/sources"
    "gitlab.com/bq-to-pubsub/sources/pp"
)

func main() {
    flag.Parse()
    ctx := context.Background()
    beam.Init()
    log.Info(ctx, "Creating new pipeline")
    pipeline, scope := beam.NewPipelineWithRoot()
    project := gcpopts.GetProject(ctx)

    ppData := pp.Query(scope, project)
    ppMessages := beam.ParDo(scope, pp.ToByteArray, ppData)
    pubsubio.Write(scope, "project", "topic", ppMessages)

    if err := beamx.Run(ctx, pipeline); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

Run Code Online (Sandbox Code Playgroud)

当我的管道在 Google Cloud Dataflow 上运行时,出现以下错误: …

go google-cloud-pubsub google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
534
查看次数

Apache Beam 云数据流流卡住侧输入

我目前正在 GCP Dataflow 中构建 PoC Apache Beam 管道。在本例中,我想使用来自 PubSub 的主输入和来自 BigQuery 的侧输入创建流式传输管道,并将处理后的数据存储回 BigQuery。

侧管线代码

side_pipeline = (
    p
    | "periodic" >> PeriodicImpulse(fire_interval=3600, apply_windowing=True)
    | "map to read request" >>
        beam.Map(lambda x:beam.io.gcp.bigquery.ReadFromBigQueryRequest(table=side_table))
    | beam.io.ReadAllFromBigQuery()
)
Run Code Online (Sandbox Code Playgroud)

侧面输入代码功能

def enrich_payload(payload, equipments):
    id = payload["id"]
    for equipment in equipments:
        if id == equipment["id"]:
            payload["type"] = equipment["type"]
            payload["brand"] = equipment["brand"]
            payload["year"] = equipment["year"]

            break

    return payload
Run Code Online (Sandbox Code Playgroud)

主管道代码

main_pipeline = (
    p
    | "read" >> beam.io.ReadFromPubSub(topic="projects/my-project/topics/topiq")
    | "bytes to dict" >> beam.Map(lambda x: json.loads(x.decode("utf-8")))
    | "transform" …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1534
查看次数

使用Python SDK创建Cloud Dataflow模板的步骤

我使用Apache Beam SDK在Python中创建了Pipeline,并且Dataflow作业在命令行中运行良好.

现在,我想从UI运行这些工作.为此,我必须为我的工作创建模板文件.我找到了使用maven在Java中创建模板的步骤.

但是我如何使用Python SDK呢?

google-cloud-dataflow apache-beam

7
推荐指数
1
解决办法
1973
查看次数

Java应用程序中的Maven冲突与google-cloud-core-grpc依赖关系

(我也为此提出了一个GitHub问题 - https://github.com/googleapis/google-cloud-java/issues/4095)

我有Apache Beam的以下2个依赖项的最新版本:

依赖性1 - google-cloud-dataflow-java-sdk-all

(Apache Beam的分布旨在简化Google Cloud Dataflow服务上Apache Beam的使用 - https://mvnrepository.com/artifact/com.google.cloud.dataflow/google-cloud-dataflow-java-sdk-all)

<dependency>
  <groupId>com.google.cloud.dataflow</groupId>
  <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
  <version>2.5.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

依赖关系2 - beam-runners-google-cloud-dataflow-java

(我猜这可以在Google Cloud Dataflow中实际运行Beam管道)

https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
  <version>2.8.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

Maven安装适用于这两个依赖项.我需要添加以下(第三)依赖项,以便独立订阅Apache Beam管道之外的Google Cloud pub/sub:

谷歌云,发布订阅

https://mvnrepository.com/artifact/com.google.cloud/google-cloud-pubsub

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.53.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

(这是最新版本).添加完成后,我做了以下冲突mvn clean install:

Could not resolve version conflict among [com.google.cloud:google-cloud-pubsub:jar:1.53.0 -> com.google.cloud:google-cloud-core-grpc:jar:1.53.0 -> io.grpc:grpc-protobuf:jar:1.16.1 -> io.grpc:grpc-core:jar:1.16.1, com.google.cloud:google-cloud-pubsub:jar:1.53.0 -> com.google.cloud:google-cloud-core-grpc:jar:1.53.0 -> io.grpc:grpc-protobuf:jar:1.16.1 -> io.grpc:grpc-protobuf-lite:jar:1.16.1 -> io.grpc:grpc-core:jar:1.16.1, com.google.cloud:google-cloud-pubsub:jar:1.53.0 -> io.grpc:grpc-netty-shaded:jar:1.16.1 -> io.grpc:grpc-core:jar:[1.16.1,1.16.1], com.google.cloud:google-cloud-pubsub:jar:1.53.0 -> io.grpc:grpc-stub:jar:1.16.1 …
Run Code Online (Sandbox Code Playgroud)

java maven google-cloud-platform google-cloud-dataflow apache-beam

7
推荐指数
2
解决办法
990
查看次数

如何在Python 3.x上获取用于数据流GCP的Apache Beam

我对GCP和数据流非常陌生。但是,我想开始测试和部署一些利用GCP上的数据流的流。根据文档,有关数据流的所有内容都必须使用Apache项目BEAM。因此,请按照此处的官方文档进行操作的情况下,受支持的python版本是2.7

坦白地说,由于Python 2.x版本将由于没有官方支持而消失,并且每个人都在使用3.x版本,因此这确实令人失望。尽管如此,我想知道是否有人知道如何准备在python版本中运行的beam和GCP数据流。

我看了这部影片并了这个牧师如何完成这个美好的里程碑,并且显然可以在Python 3.5上运行。

更新资料

自从我努力处理数据流以来,我想要的伙计们引起了我的思考。我对使用Java或Python版本的工具开始具有挑战性感到非常失望。从python开始,存在关于版本3的限制,该版本几乎是当前的标准。另一方面,java在版本11上运行时会遇到问题,我必须进行一些调整才能在代码的版本8上运行,然后我开始在代码上遇到许多不兼容问题。简而言之,如果GCP真正想前进并成为第一名,那么还有很多地方需要改进。:失望

解决方法

我将Java版本降级为jdk 8,安装了maven,现在eclipse版本适用于Apache Beam。

我终于解决了,但是,GCP确实请考虑增强并扩展对Java / Python最新版本的支持。

非常感谢

python dataflow google-cloud-platform google-cloud-dataflow apache-beam

7
推荐指数
2
解决办法
3349
查看次数