我正在尝试通过XCom将字符串列表从一个任务传递到另一个任务,但我似乎无法将推送的列表解释为列表。
例如,当我在某个blah
在 a 中运行的函数中执行此操作时ShortCircuitOperator
:
paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list]
kwargs['ti'].xcom_push(key='return_value', value=full_paths)
Run Code Online (Sandbox Code Playgroud)
然后我想使用这样的列表作为运算符的参数。例如,
run_task_after_blah = AfterBlahOperator(
task_id='run-task-after-blah',
...,
input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}",
...,
)
Run Code Online (Sandbox Code Playgroud)
我希望input_paths
等于paths
但不是因为渲染首先发生然后分配,并且模板渲染在某种程度上将xcom_pull
返回转换为字符串化列表(此后我的AfterBlahOperator
插入将其分配为 JSON 中元素的值。
我尝试将其连接paths
成一个由某个分隔符分隔的字符串,然后将其推送到 XCom,然后在从 XCom 中拉出时将其拆分回来,但是由于 XCom 首先呈现,我得到了在模板中调用函数时的字符串化列表split
或者paths
如果split
函数应用于参数的原始连接字符串(如"{{ ti.xcom_pull(task_ids='find-paths') }}".split(';')
.
当提取的值可以进一步处理时,XCom 似乎非常适合作为任务参数的单个值或多个值,但不能将 multiple_values 转换为“一个”作为任务的参数。
有没有办法做到这一点而不必编写一个额外的函数来精确返回这样的字符串列表?或者也许我滥用 XCom 太多了,但是 Airflow 中有许多操作符将元素列表作为参数(例如,通常是多个文件的完整路径,这些文件是先前任务的结果,因此事先不知道)。
我们一直在努力在(标准 sql)BigQuery 中循环数据,但没有成功。
我不确定它是否是 sql 支持的功能,我们对问题的理解,或者我们想要在 BigQuery 中执行此操作的方式。
无论如何,假设我们有一个事件表,其中每个事件都由用户 ID 和日期描述(同一用户 ID 在同一日期可能有许多事件)
id STRING
dt DATE
Run Code Online (Sandbox Code Playgroud)
我们想知道的一件事是在给定的时间段内有多少不同的用户生成了事件。这是相当微不足道的,只是表上的一个 COUNT,以句点作为 WHERE 子句中的约束。例如,如果我们有四个月的时间段:
SELECT
COUNT(DISTINCT id) AS total
FROM
`events`
WHERE
dt BETWEEN DATE_ADD(CURRENT_DATE(), INTERVAL -4 MONTH)
AND CURRENT_DATE()
Run Code Online (Sandbox Code Playgroud)
但是,如果我们希望在相同的给定时间段内递归地获取其他天(或周)的历史记录,就会出现问题。例如,昨天,前天,等等......直到......例如,3个月前。所以这里的变量将是 CURRENT_DATE() ,它可以回溯一天或任何一个因素,但间隔保持不变(在我们的例子中是 4 个月)。我们期待这样的事情(一天的因素):
2017-07-14 2017-03-14 1760333
2017-07-13 2017-03-13 1856333
2017-07-12 2017-03-12 2031993
...
2017-04-14 2017-01-14 1999352
Run Code Online (Sandbox Code Playgroud)
这只是对同一张桌子上的每一天、每周等进行循环,然后对这段时间内发生的不同事件进行计数。但是我们不能在 BigQuery 中进行“循环”。
One way we thought was a JOIN, and then a COUNT on the GROUP BY intervals (taking advantage of the HAVING clause …
我想知道是否有可能在Google Dataflow中运行自定义的Apache Beam Python版本。公共存储库中不可用的版本(撰写本文时:0.6.0和2.0.0)。例如,来自Apache Beam官方存储库的HEAD版本,或与此相关的特定标签。
我知道有可能按照官方文档中所述打包自定义软件包(例如,本地私有软件包)。这里回答了有关如何对其他一些脚本执行此操作的问题。甚至还有GIST 对此进行指导。
但是我没有设法获得当前的Apache Beam开发版本(或带标签的版本),该版本可在其官方存储库的master分支中获得,并打包并通过脚本发送到Google Dataflow。例如,对于最新的可用标签,PiP要处理的链接为:git+https://github.com/apache/beam.git@v2.1.0-RC2#egg=apache_beam[gcp]&subdirectory=sdks/python
我得到如下信息:
INFO:root:Executing command: ['.../bin/python', '-m', 'pip', 'install', '--download', '/var/folders/nw/m_035l9d7f1dvdbd7rr271tcqkj80c/T/tmpJhCkp8', 'apache-beam==2.1.0', '--no-binary', ':all:', '--no-deps']
DEPRECATION: pip install --download has been deprecated and will be removed in the future. Pip now has a download command that should be used instead.
Collecting apache-beam==2.1.0
Could not find a version that satisfies the requirement apache-beam==2.1.0 (from versions: 0.6.0, 2.0.0)
No matching distribution found for apache-beam==2.1.0
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?(我想知道是否有可能,因为Google Dataflow可能已经修复了可以运行到官方发布版本的Apache Beam版本)。