为 WritetoFiles 设置文件名

Thi*_*ijs 2 python-3.x google-cloud-dataflow apache-beam

我的流程将文件存储在磁盘上,我需要设置文件名,以便我可以找回东西。

默认命名是窗口时间戳和计数器,这对我没有帮助。文档对我来说不够清楚。(https://beam.apache.org/releases/pydoc/2.16.0/apache_beam.io.fileio.html?highlight=default_file_naming

fileio.WriteToFiles(archive_storage, file_naming=beam.io.fileio.destination_prefix_naming())
Run Code Online (Sandbox Code Playgroud)

我想命名文件<HASH>.json中的 HASH 是文件中数据的文件。

Gui*_*ins 5

多亏了这个例子,我才能得到一个工作片段。在这种情况下,我们将destination根据每个记录的散列为每个记录指定一个不同的,因为我们要将每个元素写入不同的文件。此外,我们将传递名为 的自定义命名函数hash_naming

data = [{'id': 0, 'message': 'hello'},
        {'id': 1, 'message': 'world'}]

(p
  | 'Create Events' >> beam.Create(data) \
  | 'JSONify' >> beam.Map(json.dumps) \
  | 'Print Hashes' >> beam.ParDo(PrintHashFn()) \
  | 'Write Files' >> fileio.WriteToFiles(
      path='./output',
      destination=lambda record: hash(record),
      sink=lambda dest: JsonSink(),
      file_naming=hash_naming))
Run Code Online (Sandbox Code Playgroud)

PrintHashFn我们将记录每个哈希每个元素:

logging.info("Element: %s with hash %s", element, hash(element))
Run Code Online (Sandbox Code Playgroud)

这样,对于我们的数据,我们将得到:

INFO:root:Element: {"message": "hello", "id": 0} with hash -1885604661473532601
INFO:root:Element: {"message": "world", "id": 1} with hash 9144125507731048840
Run Code Online (Sandbox Code Playgroud)

可能有更好的方法,但我发现调用fileio.destination_prefix_naming()(*args)我们可以-1885604661473532601从默认命名方案 ( -1885604661473532601----00000-00001) 中检索目标( ):

def hash_naming(*args):
  file_name = fileio.destination_prefix_naming()(*args)  # -1885604661473532601----00000-00001
  destination = file_name.split('----')[0]  # -1885604661473532601
  return '{}.json'.format(destination)  # -1885604661473532601.json
Run Code Online (Sandbox Code Playgroud)

请注意,如果您在混合中添加窗口,则在您的情况下获取子字符串的拆分可能会有所不同。

使用 2.16.0 SDK 运行脚本,DirectRunner我得到以下输出:

$ ls output/
-1885604661473532601.json  9144125507731048840.json
$ cat output/-1885604661473532601.json 
"{\"message\": \"hello\", \"id\": 0}"
Run Code Online (Sandbox Code Playgroud)

在此处更新了完整代码。