标签: google-cloud-dataflow

导入apache_beam元类冲突

当我尝试导入apache beam时,我收到以下错误.

>>> import apache_beam
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/__init__.py", line 78, in <module>
    from apache_beam import io
  File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/io/__init__.py", line 21, in <module>
    ...
    from apitools.base.protorpclite import messages
  File "/home/toor/pfff/local/lib/python2.7/site-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
    class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
    metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
Run Code Online (Sandbox Code Playgroud)

我正在一个新的虚拟环境中工作,我已经通过安装带有pip的google-cloud-dataflow安装了apache_beam,因为我需要能够在google云平台上运行的版本.

pip install google-cloud-dataflow

我不知道如何解决这个错误.为了完整起见,我在Windows上的ubuntu上使用bash上的python 2.7.12.我的同事在Windows上的ubuntu上运行bash时遇到了同样的错误,而直接在Windows上运行工作正常.

安装的软件包版本是: …

python-2.7 google-cloud-dataflow apache-beam

12
推荐指数
1
解决办法
1420
查看次数

Apache Beam:DoFn与PTransform

二者DoFnPTransform是定义操作的装置PCollection.我们怎么知道何时使用?

google-cloud-dataflow apache-beam

12
推荐指数
1
解决办法
1514
查看次数

在Dataflow Worker上使用SSH密钥提取私有库

我正在设置一个数据流作业,为此作业,工作人员需要访问私有的bitbucket存储库以安装库来处理数据。为了授予对数据流工作者的访问权限,我设置了一对SSH密钥(公共和私有)。我设法将私钥添加到我的数据流工作者上。当尝试通过git + ssh pip安装软件包时,出现错误Host key verification failed

我试图.ssh/known_hosts在dataflow worker上查找文件,但是在普通VM上并不那么简单。

另外,我可以通过以下命令自行设置它,但效果不佳:

mkdir -p ~/.ssh
chmod 0700 ~/.ssh
ssh-keyscan bitbucket.org > ~/.ssh/known_hosts
Run Code Online (Sandbox Code Playgroud)

我仍然收到Host key verification failed错误。

建议的替代解决方案是运行此问题,ssh-keygen -R bitbucket.org但随后出现以下错误: mkstemp: No such file or directory

对于Dataflow Python SDK,您需要使用来打包代码setup.py。工人启动时要执行的所有命令都用编写subprocess.Popen。命令列表如下:

CUSTOM_COMMANDS = [
    # decrypt key encrypted key in repository via gcloud kms
    ['gcloud', '-v'],
    ['gcloud', 'kms', 'decrypt', '--location', 'global', '--keyring',
     'bitbucketpackages', '--key', 'package', '--plaintext-file',
     'bb_package_key_decrypted', '--ciphertext-file', 'bb_package_key'],
    ['chmod', '700', 'bb_package_key_decrypted'],
    # …
Run Code Online (Sandbox Code Playgroud)

bitbucket ssh-keys google-cloud-dataflow

12
推荐指数
1
解决办法
203
查看次数

谷歌数据流与Apache风暴

阅读Google的Dataflow API,我的印象是它与Apache Storm的功能非常相似.通过流水线流实时数据处理.除非我完全忽略了这一点,否则我不希望在如何执行彼此写入的管道上建立桥梁,而是期待与Google不同的东西,而不是重新发明轮子.Apache Storm已经很好地放置并可用于任何编程语言.做这样的事情的真正价值是什么?

distributed-computing apache-spark google-cloud-dataflow

11
推荐指数
1
解决办法
4061
查看次数

如何将流数据与Dataflow/Beam中的大型历史数据集相结合

我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.

我看过以下方法:

  1. 使用30天的固定窗口:最有可能放入大窗口以适应内存,我不需要更新用户的历史记录,只需参考它
  2. 使用CoGroupByKey连接两个数据集,但这两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的案例(24小时对30天)
  3. 使用侧输入检索用户的会话历史对于一个给定elementprocessElement(ProcessContext processContext)

我的理解是,通过加载的数据.withSideInputs(pCollectionView)需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.

我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?

我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.

一些伪代码来说明:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-flink apache-beam

11
推荐指数
1
解决办法
2137
查看次数

在GCD上运行的PubsubIO的水印启发式是什么?

嗨,我正在尝试运行一个管道,我正在计算发布到pubsub的消息与30秒心跳*之间的差异*(10K流,每个心跳每30秒).我不关心100%的数据完整性,但我想了解PubsubIO的水印启发式(以及我是否可以调整它),以确定我是否能够以足够低的损失忽略后期数据.

*注意,pubsub主题提供了[可能需要几天]持久性,以防我们不得不取消管道,因此启发式工作与积压订阅很有效.

有人可以解释如何计算水印(假设使用了timestamplabel()),以及如何调整水印(如果有的话)?

google-cloud-dataflow

11
推荐指数
1
解决办法
2225
查看次数

如何为数据流指定工作者数量?

我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?

谢谢

google-cloud-dataflow apache-beam

10
推荐指数
0
解决办法
1065
查看次数

来自CSV的分区数据因此我可以处理更大的补丁而不是单独的行

我刚刚开始使用Google Data Flow,我编写了一个简单的流程,从云存储中读取CSV文件.其中一个步骤涉及调用Web服务以丰富结果.有问题的Web服务在批量发送多个100个请求时表现更好.

在查看API时,我没有看到将PCollection的100个元素聚合到单个Par.do执行中的好方法.然后需要拆分结果以处理写入BigQuery表的流的最后一步.

不确定我是否需要使用窗口是我想要的.我看到的大多数窗口示例都更适合在给定时间段内进行计数.

google-cloud-dataflow

10
推荐指数
1
解决办法
1004
查看次数

来源与PTransform

我是该项目的新手,我正在尝试在Dataflow和数据库之间创建一个连接器.

文档明确指出我应该使用Source和Sink但我看到很多人直接使用与PInput或PDone相关联的PTransform.

源/接收器API处于实验阶段(使用PTransform解释所有示例),但似乎更容易将其与自定义运行器集成(例如:spark).

如果我参考代码,则使用这两种方法.我看不到任何使用PTransform API会更有趣的用例.

Source/Sink API是否应该重新设计PTranform API?

我是否遗漏了能明确区分这两种方法的东西?

Source/Sink API是否足够稳定,被认为是编码输入和输出的好方法?

谢谢你的建议!

java google-cloud-dataflow

10
推荐指数
1
解决办法
308
查看次数

使用Apache Beam进行窗口化 - 固定Windows似乎不会关闭?

我们试图在Apache Beam管道上使用固定窗口(使用DirectRunner).我们的流程如下:

  1. 从pub/sub中提取数据
  2. 将JSON反序列化为Java对象
  3. 窗口事件w /固定窗口5秒
  4. 使用自定义CombineFn,将每个Events 窗口组合成一个List<Event>
  5. 为了测试,只需输出结果 List<Event>

管道代码:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn()) …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-dataflow apache-beam

10
推荐指数
1
解决办法
4015
查看次数