我怀疑
airflow run dag_id task_id execution_date
会运行所有上游任务,但事实并非如此.当它看到并非所有依赖任务都运行时,它将失败.如何运行特定任务及其所有依赖项?我猜这是不可能的,因为气流设计的决定,但有没有办法解决这个问题?
-- 编辑 我注意到我输入的时间不是我想要的。我将下午 12 点之后的时间转换为 24 小时制。不过,unutbu 的回答应该还是很清楚的。
-- 第二次编辑。我改变了数据来做一个更好的例子。
下面是按日期索引的时间序列。我想从 start_datetime 开始进行聚合,并根据下面的 timedelta(9.5 小时 = 34200 秒)继续聚合。
def main():
# start_datetime = datetime.datetime(2013, 1, 1, 8)
# end_datetime = datetime.datetime(2013, 1, 1, 5, 30)
s = pd.Series(
np.arange(2, 10),
pd.to_datetime([
'20130101 7:34:04', '20130101 8:34:08', '20130101 10:34:08',
'20130101 12:34:15', '20130101 13:34:28', '20130101 12:34:54',
'20130101 14:34:55', '20130101 17:29:12']))
print(s)
bar_size = datetime.timedelta(seconds=60*60*9.5)
time_group = pd.Grouper(
freq=pd.Timedelta(bar_size), closed='left', label='right')
foobar = s.groupby(time_group).agg(np.sum)
print(foobar)
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
运行上面的代码将输出以下内容:
2013-01-01 09:30:00 5 …Run Code Online (Sandbox Code Playgroud) asyncio模块的描述是:
该模块提供了使用协同程序编写单线程并发代码,通过套接字和其他资源多路复用I/O访问,运行网络客户端和服务器以及其他相关原语的基础结构.
我一直在阅读有关新的和非凡的asyncio python模块/包/无论如何.我知道有python GIL,因此asyncio非常适合GIL,因为(目的是)你在一个线程上用事件循环来管理事物.什么是并发?好吧,似乎I/O似乎是并发的.我相信I/O操作由操作系统处理.那么在asyncio的内部,它是否编写了一个利用操作系统给出的函数的并发C扩展?