我有一个存储为模板的管道。我正在使用 node.js 客户端从云函数运行此管道。一切正常,但是当我需要从不同区域运行此模板时,我会出错。
根据文档,我可以通过payload中的location参数进行设置
{
projectId: 123,
resource: {
location: "europe-west1",
jobName: `xxx`,
gcsPath: 'gs://xxx'
}
}
Run Code Online (Sandbox Code Playgroud)
这给了我以下错误:
The workflow could not be created, since it was sent to an invalid regional endpoint (europe-west1).
Please resubmit to a valid Cloud Dataflow regional endpoint.
Run Code Online (Sandbox Code Playgroud)
如果我将位置参数移出资源节点,我会得到同样的错误,例如:
{
projectId: 123,
location: "europe-west1",
resource: {
jobName: `xxx`,
gcsPath: 'gs://xxx'
}
}
Run Code Online (Sandbox Code Playgroud)
如果我在环境中设置区域并删除位置,例如:
{
projectId: 123,
resource: {
jobName: `xxx`,
gcsPath: 'gs://xxx',
environment: {
zone: "europe-west1-b"
}
}
}
Run Code Online (Sandbox Code Playgroud)
我不再收到任何错误,但数据流 UI 告诉我作业正在运行 us-east1
如何运行此模板并提供区域/区域 I
我想使用 PYTHON 执行 Google 数据流模板。实际上,我一直在使用Dataflow REST API或Cloud Functions集成执行数据流模板。这是我在 Postman 中执行的 Dataflow 模板:
网址: https://dataflow.googleapis.com/v1b3/projects/{{my-project-id}}/templates:launch?gcsPath=gs://{{my-cloud-storage-bucket}}/temp/cloud-dataprep-template
{
"jobName": "test-datfalow-job",
"parameters": {
"inputLocations" : "{\"location1\":\"gs://{{my-cloud-storage-bucket}}/my-folder/**/*\"}",
"outputLocations": "{\"location1\":\"gs://{{my-cloud-storage-bucket}}/my-output/output.csv\"}"
},
"environment": {
"tempLocation": "gs://{{my-cloud-storage-bucket}}/tmp",
"zone": "us-central1-f"
}
}
Run Code Online (Sandbox Code Playgroud)
我不知道是否有机会使用 google-api-python-client 或者我必须使用 python 的 requests.post 和 Google Cloud Authentication 执行此 HTTP POST
我正在通过ReadFromPubSubwith阅读消息timestamp_attribute=None,它应该将时间戳设置为发布时间。
这样,我最终得到 a PCollectionofPubsubMessage元素。
如何按顺序访问这些元素的时间戳,例如将它们保存到数据库?我能看到的唯一属性是dataand attributes,并且attributes只有来自 Pub/Sub 的键。
编辑:示例代码
with beam.Pipeline(options=pipeline_options) as p:
items = (p
| ReadFromPubSub(topic=args.read_topic, with_attributes=True)
| beam.WindowInto(beam.window.FixedWindows(args.time_window))
| 'FormatMessage' >> beam.Map(format_message)
| 'WriteRaw' >> WriteToBigQuery(args.raw_table, args.dataset,
args.project, write_disposition='WRITE_APPEND')
)
Run Code Online (Sandbox Code Playgroud)
whereformat_message将使用 aPubsubMessage并返回一个字典,表示要附加到表中的行:
def format_message(message):
formatted_message = {
'data': base64.b64encode(message.data),
'attributes': str(message.attributes)
}
return formatted_message
Run Code Online (Sandbox Code Playgroud) 我正在实施 Pub/Sub 到 BigQuery 管道。它看起来类似于How to create read transform using ParDo and DoFn in Apache Beam,但在这里,我已经创建了一个 PCollection。
我正在遵循Apache Beam 文档中描述的内容来实现 ParDo 操作以使用以下管道准备表行:
static class convertToTableRowFn extends DoFn<PubsubMessage, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
PubsubMessage message = c.element();
// Retrieve data from message
String rawData = message.getData();
Instant timestamp = new Instant(new Date());
// Prepare TableRow
TableRow row = new TableRow().set("message", rawData).set("ts_reception", timestamp);
c.output(row);
}
}
// Read input from Pub/Sub
pipeline.apply("Read from …Run Code Online (Sandbox Code Playgroud) python -m main \ --setup_file setup.py \ --runner DataflowRunner \ --project my-test \ --staging_location gs://my-test/staging \ --temp_location gs://my-test/temp \ --template_location gs://my-test/templates/test --output gs://my-test/output
Run Code Online (Sandbox Code Playgroud)
上面的命令只在本地运行(本地安装的需要依赖项)并且不创建模板。这是 main.py 中的管道选项:
pipeline_options = {
'project': 'my-test',
'staging_location': 'gs://my-test/staging',
'runner': 'DataflowRunner',
'job_name': 'test',
'temp_location': 'gs://my-test/temp',
'save_main_session': True,
'setup_file':'setup.py',
'output': 'gs://my-test/output',
'template_location': 'gs://my-test/templates/test'
}
options = PipelineOptions.from_dictionary(pipeline_options)
with beam.Pipeline(options=options) as p:
Run Code Online (Sandbox Code Playgroud)
这是 setup.py:
import subprocess
import setuptools
from setuptools.command.bdist_egg import bdist_egg as _bdist_egg
class bdist_egg(_bdist_egg):
def run(self):
self.run_command('CustomCommands')
_bdist_egg.run(self)
CUSTOM_COMMANDS = [
['apt-get', 'update'],
['apt-get', '--assume-yes', 'install', …Run Code Online (Sandbox Code Playgroud) TypeError: 'PCollection' object does not support indexing
Run Code Online (Sandbox Code Playgroud)
上述错误是由于尝试将 Pcollection 转换为列表而导致的:
filesList = (files | beam.combiners.ToList())
lines = (p | 'read' >> beam.Create(ReadSHP().ReadSHP(filesList))
| 'map' >> beam.Map(_to_dictionary))
Run Code Online (Sandbox Code Playgroud)
和:
def ReadSHP(self, filesList):
"""
"""
sf = shp.Reader(shp=filesList[1], dbf=filesList[2])
Run Code Online (Sandbox Code Playgroud)
如何解决这个问题?任何帮助表示赞赏。
我想在带有 python 的数据流模板中使用 FireStore。
我做了这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)
这是使用它的适当方式吗?
额外的信息
def firestore_update_multiple(row):
from google.cloud import firestore
db = firestore.Client()
doc_ref = db.collection(u'data').document(u'one')
doc_ref.update({
u'arrayExample': u'DataflowRunner',
u'booleanExample': True
})
Run Code Online (Sandbox Code Playgroud) python google-cloud-platform google-cloud-dataflow google-cloud-firestore
根据以下文档,如果您没有明确指定触发器,您将获得如下描述的行为:
如果未指定,默认行为是在水印通过窗口末尾时首先触发,然后在每次有迟到数据时再次触发。
这种行为对于 FixedWindow 也是如此吗?例如,您会假设固定窗口应该具有在水印通过窗口结束后重复触发的默认触发器,并丢弃所有延迟数据,除非明确处理延迟数据。另外,在源代码中的何处可以看到触发器的定义,例如 FixedWindow 对象?
从像 pub/sub 这样的无限源读取数据后,我正在应用窗口化。我需要将属于一个窗口的所有记录写入一个单独的文件。我在 Java 中找到了这个,但在 python 中找不到任何东西。
我有一个 .txt 文件,它具有 JSON 格式。我想读取、操作和重组文件(更改字段名称...) 如何使用 Apache Beam 在 Python 中执行此操作?
python google-cloud-platform google-cloud-dataflow apache-beam