我正在尝试使用以下流程在 Google Dataflow 上运行作业:

本质上采用单个数据源,根据字典中的某些值进行过滤,并为每个过滤条件创建单独的输出。
我编写了以下代码:
# List of values to filter by
x_list = [1, 2, 3]
with beam.Pipeline(options=PipelineOptions().from_dictionary(pipeline_params)) as p:
# Read in newline JSON data - each line is a dictionary
log_data = (
p
| "Create " + input_file >> beam.io.textio.ReadFromText(input_file)
| "Load " + input_file >> beam.FlatMap(lambda x: json.loads(x))
)
# For each value in x_list, filter log_data for dictionaries containing the value & write out to separate file
for i in x_list:
# Return …Run Code Online (Sandbox Code Playgroud) 我有一个包含多列的数据集,其中 1 列包含列表条目:
DT = data.table(
x = c(1:5),
y = seq(2, 10, 2),
z = list(list("a","b","a"), list("a","c"), list("b","c"), list("a","b","c"), list("b","c","b"))
)
Run Code Online (Sandbox Code Playgroud)
基本上,我试图从 z 列中取消列出 a、b、c,并根据 x 和 y 值聚合数据。
期望的输出:
z x sum(y)
1: a 1 4
2: b 1 2
3: a 2 4
4: c 2 4
5: b 3 6
6: c 3 6
7: a 4 8
8: b 4 8
9: c 4 8
10: b 5 20
11: c 5 10
Run Code Online (Sandbox Code Playgroud)
我目前的方法是相当迂回的;我在与 z …