标签: google-cloud-dataflow

谷歌数据流 - 无效的区域端点 - 无法在来自 nodejs 客户端的模板上设置区域

我有一个存储为模板的管道。我正在使用 node.js 客户端从云函数运行此管道。一切正常,但是当我需要从不同区域运行此模板时,我会出错。

根据文档,我可以通过payload中的location参数进行设置

{
  projectId: 123,
  resource: {
    location: "europe-west1",
    jobName: `xxx`,
    gcsPath: 'gs://xxx'
  }
}
Run Code Online (Sandbox Code Playgroud)

这给了我以下错误:

The workflow could not be created, since it was sent to an invalid regional endpoint (europe-west1). 
Please resubmit to a valid Cloud Dataflow regional endpoint.
Run Code Online (Sandbox Code Playgroud)

如果我将位置参数移出资源节点,我会得到同样的错误,例如:

{
  projectId: 123,
  location: "europe-west1",
  resource: {
    jobName: `xxx`,
    gcsPath: 'gs://xxx'
  }
}
Run Code Online (Sandbox Code Playgroud)

如果我在环境中设置区域并删除位置,例如:

{
  projectId: 123,
  resource: {
    jobName: `xxx`,
    gcsPath: 'gs://xxx',
    environment: {
        zone: "europe-west1-b"
    }
   }
}
Run Code Online (Sandbox Code Playgroud)

我不再收到任何错误,但数据流 UI 告诉我作业正在运行 us-east1

如何运行此模板并提供区域/区域 I

google-cloud-platform google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1583
查看次数

使用 PYTHON 运行 Google 数据流模板

我想使用 PYTHON 执行 Google 数据流模板。实际上,我一直在使用Dataflow REST APICloud Functions集成执行数据流模板。这是我在 Postman 中执行的 Dataflow 模板:

网址: https://dataflow.googleapis.com/v1b3/projects/{{my-project-id}}/templates:launch?gcsPath=gs://{{my-cloud-storage-bucket}}/temp/cloud-dataprep-template

    {
    "jobName": "test-datfalow-job",
    "parameters": {
        "inputLocations" : "{\"location1\":\"gs://{{my-cloud-storage-bucket}}/my-folder/**/*\"}",
        "outputLocations": "{\"location1\":\"gs://{{my-cloud-storage-bucket}}/my-output/output.csv\"}"
    },
    "environment": {
        "tempLocation": "gs://{{my-cloud-storage-bucket}}/tmp",
        "zone": "us-central1-f"
    }
}
Run Code Online (Sandbox Code Playgroud)

我不知道是否有机会使用 google-api-python-client 或者我必须使用 python 的 requests.post 和 Google Cloud Authentication 执行此 HTTP POST

python post google-cloud-dataflow

1
推荐指数
1
解决办法
2287
查看次数

Apache Beam Python SDK:如何访问元素的时间戳?

我正在通过ReadFromPubSubwith阅读消息timestamp_attribute=None,它应该将时间戳设置为发布时间。

这样,我最终得到 a PCollectionofPubsubMessage元素。

如何按顺序访问这些元素的时间戳,例如将它们保存到数据库?我能看到的唯一属性是dataand attributes,并且attributes只有来自 Pub/Sub 的键。

编辑:示例代码

with beam.Pipeline(options=pipeline_options) as p:
    items = (p
        | ReadFromPubSub(topic=args.read_topic, with_attributes=True)
        | beam.WindowInto(beam.window.FixedWindows(args.time_window))
        | 'FormatMessage' >> beam.Map(format_message)
        | 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
            args.project, write_disposition='WRITE_APPEND')
    )
Run Code Online (Sandbox Code Playgroud)

whereformat_message将使用 aPubsubMessage并返回一个字典,表示要附加到表中的行:

def format_message(message):
    formatted_message = {
        'data': base64.b64encode(message.data),
        'attributes': str(message.attributes)
    }
    return formatted_message
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1261
查看次数

应用不适用于使用 Apache Beam 的 ParDo 和 DoFn

我正在实施 Pub/Sub 到 BigQuery 管道。它看起来类似于How to create read transform using ParDo and DoFn in Apache Beam,但在这里,我已经创建了一个 PCollection。

我正在遵循Apache Beam 文档中描述的内容来实现 ParDo 操作以使用以下管道准备表行:

static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
    @ProcessElement
    public void processElement(ProcessContext c) {
        PubsubMessage message = c.element();
        // Retrieve data from message
        String rawData = message.getData();
        Instant timestamp = new Instant(new Date());
        // Prepare TableRow
        TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
        c.output(row);
    }
}

// Read input from Pub/Sub
pipeline.apply("Read from …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
2255
查看次数

无法使用 Dataflow + Beam + Python 创建模板

python -m main \ --setup_file setup.py \ --runner DataflowRunner \ --project my-test \ --staging_location gs://my-test/staging \ --temp_location gs://my-test/temp \  --template_location gs://my-test/templates/test --output gs://my-test/output
Run Code Online (Sandbox Code Playgroud)

上面的命令只在本地运行(本地安装的需要依赖项)并且不创建模板。这是 main.py 中的管道选项:

pipeline_options = {
    'project': 'my-test',
    'staging_location': 'gs://my-test/staging',
    'runner': 'DataflowRunner',
    'job_name': 'test',
    'temp_location': 'gs://my-test/temp',
    'save_main_session': True,
    'setup_file':'setup.py',
    'output': 'gs://my-test/output',
    'template_location': 'gs://my-test/templates/test'
}
options = PipelineOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
Run Code Online (Sandbox Code Playgroud)

这是 setup.py:

import subprocess

import setuptools
from setuptools.command.bdist_egg import bdist_egg as _bdist_egg


class bdist_egg(_bdist_egg): 
   def run(self):
      self.run_command('CustomCommands')
      _bdist_egg.run(self)
CUSTOM_COMMANDS = [
   ['apt-get', 'update'],
   ['apt-get', '--assume-yes', 'install', …
Run Code Online (Sandbox Code Playgroud)

python templates google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
647
查看次数

无法将beam python pcollection转换为列表

TypeError: 'PCollection' object does not support indexing
Run Code Online (Sandbox Code Playgroud)

上述错误是由于尝试将 Pcollection 转换为列表而导致的:

filesList = (files | beam.combiners.ToList())

lines = (p | 'read' >> beam.Create(ReadSHP().ReadSHP(filesList))
            | 'map' >> beam.Map(_to_dictionary))
Run Code Online (Sandbox Code Playgroud)

和:

def ReadSHP(self, filesList):
    """
    """
    sf = shp.Reader(shp=filesList[1], dbf=filesList[2])  
Run Code Online (Sandbox Code Playgroud)

如何解决这个问题?任何帮助表示赞赏。

python google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
3456
查看次数

在 Google Dataflow 中使用 FireStore

我想在带有 python 的数据流模板中使用 FireStore。

我做了这样的事情:

with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)

这是使用它的适当方式吗?


额外的信息

def firestore_update_multiple(row):
    from google.cloud import firestore
    db = firestore.Client()
    doc_ref = db.collection(u'data').document(u'one')

    doc_ref.update({
        u'arrayExample': u'DataflowRunner',
        u'booleanExample': True
    })
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform google-cloud-dataflow google-cloud-firestore

1
推荐指数
1
解决办法
2141
查看次数

Apache Beam:触发固定窗口

根据以下文档,如果您没有明确指定触发器,您将获得如下描述的行为:

如果未指定,默认行为是在水印通过窗口末尾时首先触发,然后在每次有迟到数据时再次触发。

这种行为对于 FixedWindow 也是如此吗?例如,您会假设固定窗口应该具有在水印通过窗口结束后重复触发的默认触发器,并丢弃所有延迟数据,除非明确处理延迟数据。另外,在源代码中的何处可以看到触发器的定义,例如 FixedWindow 对象?

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
2331
查看次数

使用python在数据流中为每个窗口写入一个文件

从像 pub/sub 这样的无限源读取数据后,我正在应用窗口化。我需要将属于一个窗口的所有记录写入一个单独的文件。我在 Java 中找到了这个,但在 python 中找不到任何东西。

google-cloud-platform google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
934
查看次数

如何在 Python 中使用 Apache Beam 读取和操作 Json 文件

我有一个 .txt 文件,它具有 JSON 格式。我想读取、操作和重组文件(更改字段名称...) 如何使用 Apache Beam 在 Python 中执行此操作?

python google-cloud-platform google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
2694
查看次数