我用python sdk编写自定义接收器。我尝试将数据存储到AWS S3。要连接S3,需要一些凭据,秘密密钥,但是出于安全原因,在代码中进行设置不是很好。我想使环境变量作为环境变量到达Dataflow工作人员。我该怎么做?
我有一个CSV文件,我不知道列名提前.我需要在Google Dataflow中进行一些转换后以JSON格式输出数据.
采用标题行并将标签渗透到所有行的最佳方法是什么?
例如:
a,b,c
1,2,3
4,5,6
Run Code Online (Sandbox Code Playgroud)
......变得(大约):
{a:1, b:2, c:3}
{a:4, b:5, c:6}
Run Code Online (Sandbox Code Playgroud) 我无法将Apache Beam示例从本地计算机提交到我们的云平台.
使用gcloud auth list我可以看到正确的帐户当前是活动的.我可以使用gsutil和Web客户端进行文件系统交互.我可以使用cloud shell通过python REPL运行管道.
但是当我尝试运行python wordcount示例时,我收到以下错误:
IOError: Could not upload to GCS path gs://my_bucket/tmp: access denied.
Please verify that credentials are valid and that you have write access
to the specified path.
Run Code Online (Sandbox Code Playgroud)
关于凭证,我有什么遗漏吗?
python google-cloud-platform google-cloud-dataflow apache-beam
我在Python中使用Apache Beam和Google Cloud Dataflow(2.3.0).将worker_machine_type参数指定为例如n1-highmem-2或时custom-1-6656,Dataflow运行作业但始终n1-standard-1为每个工作程序使用标准机器类型.
如果我做错了,有没有人知道?
其他主题(此处和此处)表明这应该是可能的,因此这可能是版本问题.
我的代码用于指定PipelineOptions(请注意,所有其他选项都可以正常工作,因此它应该识别worker_machine_type参数):
def get_cloud_pipeline_options(project):
options = {
'runner': 'DataflowRunner',
'job_name': ('converter-ml6-{}'.format(
datetime.now().strftime('%Y%m%d%H%M%S'))),
'staging_location': os.path.join(BUCKET, 'staging'),
'temp_location': os.path.join(BUCKET, 'tmp'),
'project': project,
'region': 'europe-west1',
'zone': 'europe-west1-d',
'autoscaling_algorithm': 'THROUGHPUT_BASED',
'save_main_session': True,
'setup_file': './setup.py',
'worker_machine_type': 'custom-1-6656',
'max_num_workers': 3,
}
return beam.pipeline.PipelineOptions(flags=[], **options)
def main(argv=None):
args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_cloud_pipeline_options(args.project_id
pipeline = beam.Pipeline(options=pipeline_options)
Run Code Online (Sandbox Code Playgroud) 我正在使用Chicago Traffic Tracker数据集,每15分钟发布一次新数据.当新数据可用时,它表示距离"实时"10-15分钟的记录(例如,查找_last_updt).
例如,在00:20,我得到数据时间戳00:10; 在00:35,我从00:20开始; 在00:50,我从00:40开始.因此,我可以获得新数据"固定"(每15分钟)的时间间隔,尽管时间戳上的间隔略有变化.
我试图在Dataflow(Apache Beam)上使用这些数据,为此我正在使用Sliding Windows.我的想法是收集和处理4个连续的数据点(4 x 15min = 60min),并且一旦新的数据点可用,理想情况下更新我的和/平均值的计算.为此,我开始使用代码:
PCollection<TrafficData> trafficData = input
.apply("MapIntoSlidingWindows", Window.<TrafficData>into(
SlidingWindows.of(Duration.standardMinutes(60)) // (4x15)
.every(Duration.standardMinutes(15))) . // interval to get new data
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
Run Code Online (Sandbox Code Playgroud)
不幸的是,看起来当我从输入中收到一个新的数据点时,我没有得到一个新的(更新的)结果GroupByKey.
我的SlidingWindows有问题吗?还是我错过了别的什么?
我尝试为Dataflow设置控制器服务帐户。在我的数据流选项中,我有:
options.setGcpCredential(GoogleCredentials.fromStream(new FileInputStream("key.json")).createScoped(someArrays));
options.setServiceAccount("xxx@yyy.iam.gserviceaccount.com");
Run Code Online (Sandbox Code Playgroud)
但我得到:
WARNING: Request failed with code 403, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://dataflow.googleapis.com/v1b3/projects/MYPROJECT/locations/MYLOCATION/jobs
Exception in thread "main" java.lang.RuntimeException: Failed to create a workflow job: (CODE): Current user cannot act as service account "xxx@yyy.iam.gserviceaccount.com. Causes: (CODE): Current user cannot act as service account "xxx@yyy.iam.gserviceaccount.com.
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:791)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:173)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
Run Code Online (Sandbox Code Playgroud)
...
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 403 …Run Code Online (Sandbox Code Playgroud) dataflow google-cloud-platform google-cloud-dataflow google-cloud-iam
我正在建立一个读取Avro通用记录的管道。要在阶段之间传递GenericRecord,我需要注册AvroCoder。该文档说,如果我使用通用记录,则架构参数可以是任意的:https : //beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of -java.lang.Class-org.apache.avro.Schema-
但是,当我将空模式传递给该方法时,AvroCoder.of(Class, Schema)它将在运行时引发异常。有没有一种方法可以为GenericRecord创建不需要模式的AvroCoder?就我而言,每个GenericRecord都有一个嵌入式模式。
异常和堆栈跟踪:
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
Run Code Online (Sandbox Code Playgroud) SDK:Apache Beam SDK for Go 0.5.0
我们在Google Cloud Data Flow中运行Apache Beam Go SDK作业.他们一直工作正常,直到最近他们间歇性地停止工作(没有对代码或配置进行任何更改).发生的错误是:
Failed to retrieve staged files: failed to retrieve worker in 3 attempts: bad MD5 for /var/opt/google/staged/worker: ..., want ; bad MD5 for /var/opt/google/staged/worker: ..., want ;
(注意:好像它在错误消息消息中缺少第二个哈希值.)
我最好能猜出工人有什么问题 - 似乎是在尝试比较工人的md5哈希值并错过其中一个值?我不知道究竟是什么比较.
有谁知道可能导致这个问题的原因是什么?
在努力使Java的KafkaIOIT适应大型数据集时,我遇到了一个问题。我想通过一个Kafka主题推送1亿条记录,验证数据的正确性,同时检查KafkaIO.Write和KafkaIO.Read的性能。
为了执行测试,我使用了来自Beam回购(此处)的Kubernetes上的Kafka集群。
预期的结果是,首先以确定性的方式生成记录,然后将它们写入Kafka-这结束了写入管道。至于读取和正确性检查-首先,从主题读取数据,然后将其解码为String表示形式,然后计算整个PCollection的哈希码(有关详细信息,请检查KafkaIOIT.java)。
在测试期间,我遇到了几个问题:
从Kafka主题读取预定的记录数时,每次的哈希值都不同。
有时并非所有记录都被读取,并且Dataflow任务会无限期地等待输入,偶尔会引发异常。
我相信有两种可能的原因导致此现象:
Kafka集群配置有问题
或KafkaIO在大数据量上表现不正常,从而重复和/或删除记录。
我找到了一个我认为可以解释第一个行为的Stack答案: 链接 -如果消息多次传递,则整个集合的哈希值显然会发生变化。
在这种情况下,我真的不知道如何在Beam中配置KafkaIO.Write来产生一次。
这就留下了无法解决的消息问题。你能帮我吗?
bigdata apache-kafka kubernetes google-cloud-dataflow apache-beam
我对Big Query 中的WRITE_TRUNCATE行为有疑问。
我有一个很大的查询表(T1),我定期将日志数据附加到它(每条日志行一行)。我想要一个数据流作业 ( D1 ) 从这个表中读取,删除任何重复的行并执行其他数据清理操作,然后将其输出到另一个大查询表 ( T2 ),替换可能已经存在于此的任何数据桌子。我相信我可以通过在数据流作业中的 BigQuery.IO 接收器中使用WRITE_TRUNCATE 写入配置来做到这一点。
问题是,如果我有另一个数据流作业 ( D2 ) 从表T2读取,而作业D1正在写入截断该表的中间,则D2看到什么数据,即它是否看到表处于它所处的状态在截断之前或截断完成之后。或者它可以在截断期间的任何步骤中看到表格(例如通过附加新数据的一部分)?
apache-beam ×8
python ×2
apache-kafka ×1
avro ×1
bigdata ×1
dataflow ×1
go ×1
java ×1
kubernetes ×1