标签: google-cloud-dataflow

如何使环境变量作为python SDK中的环境变量到达Dataflow工作者

我用python sdk编写自定义接收器。我尝试将数据存储到AWS S3。要连接S3,需要一些凭据,秘密密钥,但是出于安全原因,在代码中进行设置不是很好。我想使环境变量作为环境变量到达Dataflow工作人员。我该怎么做?

google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
1657
查看次数

使用Dataflow读取CSV标头

我有一个CSV文件,我不知道列名提前.我需要在Google Dataflow中进行一些转换后以JSON格式输出数据.

采用标题行并将标签渗透到所有行的最佳方法是什么?

例如:

a,b,c
1,2,3
4,5,6
Run Code Online (Sandbox Code Playgroud)

......变得(大约):

{a:1, b:2, c:3}
{a:4, b:5, c:6}
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
4336
查看次数

Google Dataflow上Apache Beam示例的权限错误

我无法将Apache Beam示例从本地计算机提交到我们的云平台.

使用gcloud auth list我可以看到正确的帐户当前是活动的.我可以使用gsutil和Web客户端进行文件系统交互.我可以使用cloud shell通过python REPL运行管道.

但是当我尝试运行python wordcount示例时,我收到以下错误:

IOError: Could not upload to GCS path gs://my_bucket/tmp: access denied.
Please verify that credentials are valid and that you have write access 
to the specified path.
Run Code Online (Sandbox Code Playgroud)

关于凭证,我有什么遗漏吗?

python google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
2
解决办法
2516
查看次数

worker_machine_type标记在使用python的Google Cloud Dataflow中无效

我在Python中使用Apache Beam和Google Cloud Dataflow(2.3.0).将worker_machine_type参数指定为例如n1-highmem-2或时custom-1-6656,Dataflow运行作业但始终n1-standard-1为每个工作程序使用标准机器类型.

如果我做错了,有没有人知道?

其他主题(此处此处)表明这应该是可能的,因此这可能是版本问题.

我的代码用于指定PipelineOptions(请注意,所有其他选项都可以正常工作,因此它应该识别worker_machine_type参数):

def get_cloud_pipeline_options(project):

  options = {
    'runner': 'DataflowRunner',
    'job_name': ('converter-ml6-{}'.format(
        datetime.now().strftime('%Y%m%d%H%M%S'))),
    'staging_location': os.path.join(BUCKET, 'staging'),
    'temp_location': os.path.join(BUCKET, 'tmp'),
    'project': project,
    'region': 'europe-west1',
    'zone': 'europe-west1-d',
    'autoscaling_algorithm': 'THROUGHPUT_BASED',
    'save_main_session': True,
    'setup_file': './setup.py',
    'worker_machine_type': 'custom-1-6656',
    'max_num_workers': 3,
  }

  return beam.pipeline.PipelineOptions(flags=[], **options)

def main(argv=None):
  args = parse_arguments(sys.argv if argv is None else argv)

  pipeline_options = get_cloud_pipeline_options(args.project_id

  pipeline = beam.Pipeline(options=pipeline_options)
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
621
查看次数

SlidingWindows用于Apache Beam上的慢速数据(大间隔)

我正在使用Chicago Traffic Tracker数据集,每15分钟发布一次新数据.当新数据可用时,它表示距离"实时"10-15分钟的记录(例如,查找_last_updt).

例如,在00:20,我得到数据时间戳00:10; 在00:35,我从00:20开始; 在00:50,我从00:40开始.因此,我可以获得新数据"固定"(每15分钟)的时间间隔,尽管时间戳上的间隔略有变化.

我试图在Dataflow(Apache Beam)上使用这些数据,为此我正在使用Sliding Windows.我的想法是收集和处理4个连续的数据点(4 x 15min = 60min),并且一旦新的数据点可用,理想情况下更新我的和/平均值的计算.为此,我开始使用代码:

PCollection<TrafficData> trafficData = input        
    .apply("MapIntoSlidingWindows", Window.<TrafficData>into(
        SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
            .every(Duration.standardMinutes(15))) .     // interval to get new data
        .triggering(AfterWatermark
                        .pastEndOfWindow()
                        .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
        .withAllowedLateness(Duration.ZERO)
        .accumulatingFiredPanes());
Run Code Online (Sandbox Code Playgroud)

不幸的是,看起来当我从输入中收到一个新的数据点时,我没有得到一个新的(更新的)结果GroupByKey.

我的SlidingWindows有问题吗?还是我错过了别的什么?

java sliding-window google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
503
查看次数

数据流设置控制器服务帐户

我尝试为Dataflow设置控制器服务帐户。在我的数据流选项中,我有:

options.setGcpCredential(GoogleCredentials.fromStream(new FileInputStream("key.json")).createScoped(someArrays)); 
options.setServiceAccount("xxx@yyy.iam.gserviceaccount.com");
Run Code Online (Sandbox Code Playgroud)

但我得到:

WARNING: Request failed with code 403, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://dataflow.googleapis.com/v1b3/projects/MYPROJECT/locations/MYLOCATION/jobs
Exception in thread "main" java.lang.RuntimeException: Failed to create a workflow job: (CODE): Current user cannot act as service account "xxx@yyy.iam.gserviceaccount.com. Causes: (CODE): Current user cannot act as service account "xxx@yyy.iam.gserviceaccount.com.
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:791)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
Run Code Online (Sandbox Code Playgroud)

...

Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-platform google-cloud-dataflow google-cloud-iam

6
推荐指数
2
解决办法
1525
查看次数

适用于GenericRecord的Apache Beam编码器

我正在建立一个读取Avro通用记录的管道。要在阶段之间传递GenericRecord,我需要注册AvroCoder。该文档说,如果我使用通用记录,则架构参数可以是任意的:https : //beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of -java.lang.Class-org.apache.avro.Schema-

但是,当我将空模式传递给该方法时,AvroCoder.of(Class, Schema)它将在运行时引发异常。有没有一种方法可以为GenericRecord创建不需要模式的AvroCoder?就我而言,每个GenericRecord都有一个嵌入式模式。

异常和堆栈跟踪:

Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
Run Code Online (Sandbox Code Playgroud)

avro google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
356
查看次数

Google云端数据流作业失败并显示错误"无法检索暂存文件:3次尝试无法检索工作人员:错误的MD5 ..."

SDK:Apache Beam SDK for Go 0.5.0

我们在Google Cloud Data Flow中运行Apache Beam Go SDK作业.他们一直工作正常,直到最近他们间歇性地停止工作(没有对代码或配置进行任何更改).发生的错误是:

Failed to retrieve staged files: failed to retrieve worker in 3 attempts: bad MD5 for /var/opt/google/staged/worker: ..., want ; bad MD5 for /var/opt/google/staged/worker: ..., want ;

(注意:好像它在错误消息消息中缺少第二个哈希值.)

我最好能猜出工人有什么问题 - 似乎是在尝试比较工人的md5哈希值并错过其中一个值?我不知道究竟是什么比较.

有谁知道可能导致这个问题的原因是什么?

go google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
206
查看次数

Kafka群集丢失或重复消息

在努力使Java的KafkaIOIT适应大型数据集时,我遇到了一个问题。我想通过一个Kafka主题推送1亿条记录,验证数据的正确性,同时检查KafkaIO.Write和KafkaIO.Read的性能。

为了执行测试,我使用了来自Beam回购(此处)的Kubernetes上的Kafka集群。

预期的结果是,首先以确定性的方式生成记录,然后将它们写入Kafka-这结束了写入管道。至于读取和正确性检查-首先,从主题读取数据,然后将其解码为String表示形式,然后计算整个PCollection的哈希码(有关详细信息,请检查KafkaIOIT.java)。

在测试期间,我遇到了几个问题:

  1. 从Kafka主题读取预定的记录数时,每次的哈希值都不同。

  2. 有时并非所有记录都被读取,并且Dataflow任务会无限期地等待输入,偶尔会引发异常。

我相信有两种可能的原因导致此现象:

Kafka集群配置有问题

或KafkaIO在大数据量上表现不正常,从而重复和/或删除记录。

我找到了一个我认为可以解释第一个行为的Stack答案: 链接 -如果消息多次传递,则整个集合的哈希值显然会发生变化。

在这种情况下,我真的不知道如何在Beam中配置KafkaIO.Write来产生一次。

这就留下了无法解决的消息问题。你能帮我吗?

bigdata apache-kafka kubernetes google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
117
查看次数

Big Query 中的 WRITE_TRUNCATE 行为

我对Big Query 中的WRITE_TRUNCATE行为有疑问。

我有一个很大的查询表(T1),我定期将日志数据附加到它(每条日志行一行)。我想要一个数据流作业 ( D1 ) 从这个表中读取,删除任何重复的行并执行其他数据清理操作,然后将其输出到另一个大查询表 ( T2 ),替换可能已经存在于此的任何数据桌子。我相信我可以通过在数据流作业中的 BigQuery.IO 接收器中使用WRITE_TRUNCATE 写入配置来做到这一点。

问题是,如果我有另一个数据流作业 ( D2 ) 从表T2读取,而作业D1正在写入截断该表的中间,则D2看到什么数据,即它是否看到表处于它所处的状态在截断之前或截断完成之后。或者它可以在截断期间的任何步骤中看到表格(例如通过附加新数据的一部分)?

上面链接的 javadoc 表明截断可能不是原子的,而Big QueryREST 文档表明它是原子的。

google-bigquery google-cloud-dataflow

5
推荐指数
1
解决办法
3472
查看次数