我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?
谢谢
我正在使用Dataflow 0.5.5 Python.在非常简单的代码中遇到以下错误:
print(len(row_list))
Run Code Online (Sandbox Code Playgroud)
row_list是一个清单.完全相同的代码,相同的数据和相同的管道在DirectRunner上运行完全正常,但在DataflowRunner上抛出以下异常.它是什么意思以及我如何解决它?
job name: `beamapp-root-0216042234-124125`
(f14756f20f567f62): Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work
work_executor.execute()
File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547)
with op.scoped_metrics_container:
File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495)
op.start()
File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149)
def start(self):
File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053)
with self.scoped_start_state:
File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968)
with self.shuffle_source.reader() as reader:
File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912)
self.output(windowed_value)
File "dataflow_worker/executor.py", line 152, …Run Code Online (Sandbox Code Playgroud) 通过此链接,我发现 Google Cloud Dataflow 为其工作人员使用 Docker 容器:Google Cloud Dataflow 实例的映像
我看到可以找出 docker 容器的图像名称。
但是,有没有办法获取这个 docker 容器(即我应该从哪个存储库获取它?),修改它,然后指示我的 Dataflow 作业使用这个新的 docker 容器?
我问的原因是我们需要在我们的 docker 上安装各种 C++ 和 Fortran 以及其他库代码,以便 Dataflow 作业可以调用它们,但是这些安装非常耗时,所以我们不想使用“资源”属性df 中的选项。
我正在为github中的repo编写自述文件,我想添加对文章的引用.在引文中编码最合适的方法是什么?例如,作为一个块引用,作为代码,作为简单的文本等?
建议?
Python为这样的实例支持@property装饰器:
class MyClass(object):
def __init__(self):
self._friend_stack = [1]
@property
def current_friend(self):
return self._friend_stack[0]
myobj = MyClass()
myobj.current_friend # 1
Run Code Online (Sandbox Code Playgroud)
是否有可能为类具有类似的东西,以便行为是这样的(例如,与setter和getter方法一起):
class MyClass(object):
_friend_stack = [1]
@property
def current_friend(cls):
return cls._friend_stack[0]
MyClass.current_friend # 1
Run Code Online (Sandbox Code Playgroud) 我想阅读一个csv文件,并使用apache beam dataflow将其写入BigQuery.为了做到这一点,我需要以字典的形式向BigQuery提供数据.如何使用apache beam转换数据才能执行此操作?
我的输入csv文件有两列,我想在BigQuery中创建一个后续的两列表.我知道如何在BigQuery中创建数据,这是直接的,我不知道的是如何将csv转换为字典.下面的代码不正确,但应该知道我正在尝试做什么.
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
Run Code Online (Sandbox Code Playgroud) python csv google-bigquery google-cloud-dataflow apache-beam
我mvn org.apache.maven.plugins:maven-dependency-plugin:3.1.1:copy-dependencies在我的项目中运行,我看到以下错误:
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-dependency-plugin:3.1.1:copy-dependencies (default-cli) on project beam-sdks-java-core: Some problems were encountered while processing the POMs:
[ERROR] [ERROR] Unknown packaging: bundle @ line 6, column 16: 1 problem was encountered while building the effective model for org.xerial.snappy:snappy-java:1.1.4
[ERROR] [ERROR] Unknown packaging: bundle @ line 6, column 16
Run Code Online (Sandbox Code Playgroud)
查看 Snappy 的 pom 文件,它看起来像这样:
<?xml version='1.0' encoding='UTF-8'?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<packaging>bundle</packaging>
<description>snappy-java: A fast compression/decompression library</description>
<version>1.1.4</version>
<name>snappy-java</name>
....
Run Code Online (Sandbox Code Playgroud)
具体来说,这<packaging>bundle</packaging> …
我有一个非常基本的 Python Dataflow 作业,它从 Pub/Sub 读取一些数据,应用 FixedWindow 并写入 Google Cloud Storage。
transformed = ...
transformed | beam.io.WriteToText(known_args.output)
Run Code Online (Sandbox Code Playgroud)
输出写入--output中特定的位置,但只是临时阶段,即
gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...
Run Code Online (Sandbox Code Playgroud)
该文件永远不会使用分片模板放入正确命名的位置。
在本地和 DataFlow 运行器上测试。
在进一步测试时,我注意到 streaming_wordcount 示例有相同的问题,但是标准 wordcount 示例写得很好。也许问题在于窗口,或从 pubsub 阅读?
WriteToText 似乎与 PubSub 的流媒体源不兼容。可能有解决方法,或者 Java 版本可能兼容,但我选择完全使用不同的解决方案。
google-cloud-storage google-cloud-pubsub google-cloud-dataflow apache-beam
我正在建立一个读取Avro通用记录的管道。要在阶段之间传递GenericRecord,我需要注册AvroCoder。该文档说,如果我使用通用记录,则架构参数可以是任意的:https : //beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/coders/AvroCoder.html#of -java.lang.Class-org.apache.avro.Schema-
但是,当我将空模式传递给该方法时,AvroCoder.of(Class, Schema)它将在运行时引发异常。有没有一种方法可以为GenericRecord创建不需要模式的AvroCoder?就我而言,每个GenericRecord都有一个嵌入式模式。
异常和堆栈跟踪:
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.checkIndexedRecord(AvroCoder.java:562)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:430)
at org.apache.beam.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:409)
at org.apache.beam.sdk.coders.AvroCoder.<init>(AvroCoder.java:260)
at org.apache.beam.sdk.coders.AvroCoder.of(AvroCoder.java:141)
Run Code Online (Sandbox Code Playgroud) 我们有一个需要使用 Celery 获取大量数据的 Django 应用程序。每隔几分钟就有 20 个左右的 celery 工人在运行。我们在 Google Kubernetes Engine 上运行,并使用 Cloud memorystore 使用 Redis 队列。
根据 Flower 的说法,即使队列为空,我们用于 celery 的 Redis 实例也已满。这会导致 Redis DB 最终被填满并且 Celery 抛出错误。
在 Flower 中,我看到任务进进出出,并且我已经增加了工作人员,现在队列总是空的。
如果我运行,redis-cli --bigkeys我会看到:
# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type. You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).
[00.00%] Biggest set found so far '_kombu.binding.my-queue-name-queue' with …Run Code Online (Sandbox Code Playgroud)