标签: google-cloud-dataflow

数据流覆盖处理时间

有没有办法覆盖数据流中的处理时间(而不是事件时间)?

我试图推理故障场景,以及如何重新计算管道的输出。

假设我有一个管道,它只是对收到的事件进行计数,固定窗口为 1 小时,允许延迟 2 小时。假设我对窗口 [t0, t0+1h) 感兴趣,并假设我有:

  • 事件A,事件时间=t0+10m,处理时间=t0+30m
  • 事件B,事件时间=t0+10m,处理时间=t0+90m

然后对事件A进行计数,并丢弃事件B。

现在,假设几天后我​​发现代码中存在错误,并且我想重新运行管道以重新计算过去同一窗口 [t0, t0+1h) 中的事件。如果现在的处理时间 = t0 + 几天,则所有事件都将被丢弃。

如果我忽略允许的迟到(假设无限),则事件 A 和 B 都会被计数。

通过覆盖处理时间(假设我第一次存储它),我可以确保事件 A 被计数而事件 B 不被计数。有没有办法做到这一点?谢谢!

google-cloud-dataflow

0
推荐指数
1
解决办法
284
查看次数

如何使用 Python 将 Google Pub/Sub 与 Google Dataflow/Beam 结合使用?

我是 Pub/Sub 和 Dataflow/Beam 的新手。我已经在 Spark 和 Kafka 中完成了一项任务,我想使用 Pub/Sub 和 Dataflow/Beam 做同样的事情。据我目前的理解,Kafka 类似于 Pub/Sub,Spark 类似于 Dataflow/Beam。

任务是获取 JSON 文件并写入 Pub/Sub 主题。然后使用 Beam/Dataflow 我需要将该数据放入 PCollection。我将如何实现这一目标?

python google-cloud-pubsub google-cloud-dataflow

0
推荐指数
1
解决办法
4467
查看次数

Google Cloud BigTable:更新列值

我有以下 BigTable 结构作为示例:

Table1 : column_family_1 : column_1 : value
Run Code Online (Sandbox Code Playgroud)

value是一个数字。这是由数据流管理的,我想每次都更新该值。

该值可能是一个金额,我想在每次用户购买时更新它(以维持迄今为止的总支出),因此我在购买事件侦听器数据流中执行以下操作(每当遇到购买事件时):

  • 发出 BigTable 请求,通过 id 获取值
  • 将新购买的金额添加到 BigTable 搜索响应中显示的金额中
  • 发出Put更新值的请求

尽管这种方法有一些网络延迟,但它似乎有效。失败的情况是,当数据流有多个工作人员时,用户进行多次购买,并且事件会发送给多个工作人员,例如:

  • Worker 1 获取事件 1,获取金额并将花费的金额添加到其中
  • Worker 2 获取事件 2,获取金额并将花费的金额添加到其中
  • 两个工人都提出Put请求,但他们都被覆盖了

为了防止这种情况,我试图提出一个仅以纯文本形式表示的请求,add 10 to the spent amount value. 这是我们可以在数据流中做的事情吗?

bigtable google-cloud-dataflow google-cloud-bigtable apache-beam

0
推荐指数
1
解决办法
5818
查看次数

当前适用于 Google Dataflow 的 GoLang SDK 是否支持自动缩放和并行处理?

目前使用 Google Dataflow 和 Python 进行批处理。这工作得很好,但是,我有兴趣在不处理 Java 的情况下提高我的数据流作业的速度。

使用 Go SDK,我实现了一个简单的管道,它从 Google Storage读取一系列100-500mbtextio.Read文件(使用),进行一些聚合并使用结果更新 CloudSQL。正在读取的文件数量可以从数十个到数百个不等。

当我运行管道时,我可以从日志中看到文件是串行读取的,而不是并行读取的,因此作业需要更长的时间。使用 Python SDK 执行的同一进程会触发自动缩放并在几分钟内运行多次读取。

我尝试使用 指定工作人员数量--num_workers=,但是,Dataflow 在几分钟后将作业缩减为一个实例,并且从日志来看,在实例运行期间不会发生并行读取。

如果我删除textio.Read并实现自定义 DoFn 以从 GCS 读取,则会发生类似的情况。读取过程仍然串行运行。

我知道当前的 Go SDK 是实验性的,缺乏许多功能,但是,我还没有找到对并行处理限制的直接参考,在这里。Go SDK 的当前版本是否支持 Dataflow 上的并行处理?

提前致谢

go google-cloud-platform google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
2462
查看次数

如何在 Python 中创建 Google Cloud Dataflow Wordcount 自定义模板?

我无法按照以下说明使用 wordcount 示例创建自定义 Google Cloud Dataflow 模板:https : //cloud.google.com/dataflow/docs/guides/templates/creating-templates

我收到与无法访问 RuntimeValueProvider 相关的错误。我究竟做错了什么?

我的主要功能wordcount.py


"""A word-counting workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions


class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""

    def __init__(self):
        self.words_counter = Metrics.counter(self.__class__, 'words')
        self.word_lengths_counter = …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
1067
查看次数

Pubsub 消息缺少时间戳属性

我正在尝试从 Apache Beam 读取 Pubsub 消息的时间戳值。

p.apply("Read PubSub messages", PubsubIO.readMessagesWithAttributes()
    .withIdAttribute("msg_id")
    .withTimestampAttribute("timestamp")
    .fromSubscription(options.getPubsubSubscription()))
Run Code Online (Sandbox Code Playgroud)

但不幸的是,我收到了以下错误,这真的让我感到惊讶,因为我认为每条消息都有一个默认时间戳。

An exception occured while executing the Java class. 
PubSub message is missing a value for timestamp attribute timestamp
Run Code Online (Sandbox Code Playgroud)

为什么我的消息没有时间戳?是因为我通过 Pubsub UI 发布了它吗?

google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
1837
查看次数

Apache Beam 2.12.0 是否支持 Java 11?

Apache Beam 2.12.0 是否支持 Java 11,还是我现在应该继续使用稳定的 Java 8 SDK?

我看到该站点根据文档推荐使用 Python 3.5 和 Beam 2.12.0,与其他更高的 Python 版本相比。这个时候它和 Java 11 有多少可比性。因此,与 Apache Beam 2.12.0 一起使用的稳定版本仍然是 Java 8。在 Java 11 中使用 Beam 2.12.0 时,我遇到的构建问题很少。

google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
1724
查看次数

AttributeError:模块“google.cloud”没有属性“存储”

我正在尝试在 GCP Dataflow 上运行一个简单的 Beam 脚本,以便将 scikit-learn 模型应用于某些数据。应用模型之前和之后都需要对数据进行处理。这就是textExtraction 和translationDictionary 的作用。我不断收到错误AttributeError: module 'google.cloud' has no attribute 'storage'(下面是完整的堆栈跟踪)。正如您所看到的,我尝试在新的虚拟环境中运行新的安装。知道如何修复吗?

我的脚本也在下面给出。

预测_DF_class.py

import apache_beam as beam
import argparse
from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import pandas as pd
import pickle as pkl
import json
import joblib

query = """
    SELECT index, product_title
    FROM `project.dataset.table`
    LIMIT 1000
"""

class ApplyDoFn(beam.DoFn):
    def __init__(self):
        self._model = None
        self._textExtraction = None
        self._translationDictionary = None
        self._storage = …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
2094
查看次数

Elasticsearch/dataflow - 约 60 个并发连接后连接超时

我们在 Elastic Cloud 上托管 elatsicsearch 集群并从数据流 (GCP) 调用它。工作在开发中运行良好,但当我们部署到产品时,我们在客户端看到大量连接超时。

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "main.py", line 159, in process
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
    raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed …
Run Code Online (Sandbox Code Playgroud)

python elasticsearch google-cloud-platform google-cloud-dataflow apache-beam

0
推荐指数
1
解决办法
2084
查看次数

云数据融合与 GCP 上的 DataFlow 之间的区别

GCP 管道服务之间有什么区别:Cloud Dataflow 和 Cloud Data fusion...您何时选择哪一个?

我使用数据融合中的 Basic 进行了 10 个实例的高级定价。Dataflow 中有 10 个实例集群 (n1-standard-8)。

Datafusion 的定价是其两倍多。

相互之间有什么优点和缺点

google-cloud-dataflow google-cloud-data-fusion

0
推荐指数
1
解决办法
3797
查看次数