有没有办法覆盖数据流中的处理时间(而不是事件时间)?
我试图推理故障场景,以及如何重新计算管道的输出。
假设我有一个管道,它只是对收到的事件进行计数,固定窗口为 1 小时,允许延迟 2 小时。假设我对窗口 [t0, t0+1h) 感兴趣,并假设我有:
然后对事件A进行计数,并丢弃事件B。
现在,假设几天后我发现代码中存在错误,并且我想重新运行管道以重新计算过去同一窗口 [t0, t0+1h) 中的事件。如果现在的处理时间 = t0 + 几天,则所有事件都将被丢弃。
如果我忽略允许的迟到(假设无限),则事件 A 和 B 都会被计数。
通过覆盖处理时间(假设我第一次存储它),我可以确保事件 A 被计数而事件 B 不被计数。有没有办法做到这一点?谢谢!
我是 Pub/Sub 和 Dataflow/Beam 的新手。我已经在 Spark 和 Kafka 中完成了一项任务,我想使用 Pub/Sub 和 Dataflow/Beam 做同样的事情。据我目前的理解,Kafka 类似于 Pub/Sub,Spark 类似于 Dataflow/Beam。
任务是获取 JSON 文件并写入 Pub/Sub 主题。然后使用 Beam/Dataflow 我需要将该数据放入 PCollection。我将如何实现这一目标?
我有以下 BigTable 结构作为示例:
Table1 : column_family_1 : column_1 : value
Run Code Online (Sandbox Code Playgroud)
这value是一个数字。这是由数据流管理的,我想每次都更新该值。
该值可能是一个金额,我想在每次用户购买时更新它(以维持迄今为止的总支出),因此我在购买事件侦听器数据流中执行以下操作(每当遇到购买事件时):
Put更新值的请求尽管这种方法有一些网络延迟,但它似乎有效。失败的情况是,当数据流有多个工作人员时,用户进行多次购买,并且事件会发送给多个工作人员,例如:
Put请求,但他们都被覆盖了为了防止这种情况,我试图提出一个仅以纯文本形式表示的请求,add 10 to the spent amount value. 这是我们可以在数据流中做的事情吗?
bigtable google-cloud-dataflow google-cloud-bigtable apache-beam
目前使用 Google Dataflow 和 Python 进行批处理。这工作得很好,但是,我有兴趣在不处理 Java 的情况下提高我的数据流作业的速度。
使用 Go SDK,我实现了一个简单的管道,它从 Google Storage读取一系列100-500mbtextio.Read文件(使用),进行一些聚合并使用结果更新 CloudSQL。正在读取的文件数量可以从数十个到数百个不等。
当我运行管道时,我可以从日志中看到文件是串行读取的,而不是并行读取的,因此作业需要更长的时间。使用 Python SDK 执行的同一进程会触发自动缩放并在几分钟内运行多次读取。
我尝试使用 指定工作人员数量--num_workers=,但是,Dataflow 在几分钟后将作业缩减为一个实例,并且从日志来看,在实例运行期间不会发生并行读取。
如果我删除textio.Read并实现自定义 DoFn 以从 GCS 读取,则会发生类似的情况。读取过程仍然串行运行。
我知道当前的 Go SDK 是实验性的,缺乏许多功能,但是,我还没有找到对并行处理限制的直接参考,在这里。Go SDK 的当前版本是否支持 Dataflow 上的并行处理?
提前致谢
我无法按照以下说明使用 wordcount 示例创建自定义 Google Cloud Dataflow 模板:https : //cloud.google.com/dataflow/docs/guides/templates/creating-templates
我收到与无法访问 RuntimeValueProvider 相关的错误。我究竟做错了什么?
我的主要功能wordcount.py:
"""A word-counting workflow."""
from __future__ import absolute_import
import argparse
import logging
import re
from past.builtins import unicode
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.options.pipeline_options import SetupOptions
class WordExtractingDoFn(beam.DoFn):
"""Parse each line of input text into words."""
def __init__(self):
self.words_counter = Metrics.counter(self.__class__, 'words')
self.word_lengths_counter = …Run Code Online (Sandbox Code Playgroud) 我正在尝试从 Apache Beam 读取 Pubsub 消息的时间戳值。
p.apply("Read PubSub messages", PubsubIO.readMessagesWithAttributes()
.withIdAttribute("msg_id")
.withTimestampAttribute("timestamp")
.fromSubscription(options.getPubsubSubscription()))
Run Code Online (Sandbox Code Playgroud)
但不幸的是,我收到了以下错误,这真的让我感到惊讶,因为我认为每条消息都有一个默认时间戳。
An exception occured while executing the Java class.
PubSub message is missing a value for timestamp attribute timestamp
Run Code Online (Sandbox Code Playgroud)
为什么我的消息没有时间戳?是因为我通过 Pubsub UI 发布了它吗?
google-cloud-platform google-cloud-pubsub google-cloud-dataflow apache-beam
Apache Beam 2.12.0 是否支持 Java 11,还是我现在应该继续使用稳定的 Java 8 SDK?
我看到该站点根据文档推荐使用 Python 3.5 和 Beam 2.12.0,与其他更高的 Python 版本相比。这个时候它和 Java 11 有多少可比性。因此,与 Apache Beam 2.12.0 一起使用的稳定版本仍然是 Java 8。在 Java 11 中使用 Beam 2.12.0 时,我遇到的构建问题很少。
我正在尝试在 GCP Dataflow 上运行一个简单的 Beam 脚本,以便将 scikit-learn 模型应用于某些数据。应用模型之前和之后都需要对数据进行处理。这就是textExtraction 和translationDictionary 的作用。我不断收到错误AttributeError: module 'google.cloud' has no attribute 'storage'(下面是完整的堆栈跟踪)。正如您所看到的,我尝试在新的虚拟环境中运行新的安装。知道如何修复吗?
我的脚本也在下面给出。
预测_DF_class.py
import apache_beam as beam
import argparse
from google.cloud import storage
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp.bigquery import parse_table_schema_from_json
import pandas as pd
import pickle as pkl
import json
import joblib
query = """
SELECT index, product_title
FROM `project.dataset.table`
LIMIT 1000
"""
class ApplyDoFn(beam.DoFn):
def __init__(self):
self._model = None
self._textExtraction = None
self._translationDictionary = None
self._storage = …Run Code Online (Sandbox Code Playgroud) 我们在 Elastic Cloud 上托管 elatsicsearch 集群并从数据流 (GCP) 调用它。工作在开发中运行良好,但当我们部署到产品时,我们在客户端看到大量连接超时。
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "main.py", line 159, in process
File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
body=body,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
raise e
File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
timeout=timeout,
File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed …Run Code Online (Sandbox Code Playgroud) python elasticsearch google-cloud-platform google-cloud-dataflow apache-beam
GCP 管道服务之间有什么区别:Cloud Dataflow 和 Cloud Data fusion...您何时选择哪一个?
我使用数据融合中的 Basic 进行了 10 个实例的高级定价。Dataflow 中有 10 个实例集群 (n1-standard-8)。
Datafusion 的定价是其两倍多。
相互之间有什么优点和缺点