当我研究流程调度时,这个问题浮现在我脑海中.
操作系统如何执行和控制二进制文件和已编译文件的执行?我想OS可能会将二进制文件的一部分复制到某个内存位置,跳转到那里,在执行该块后返回并执行下一个块.但是它对它没有任何控制(例如程序可以在任何地方跳转而不会回来).
在JVM的情况下,它很有意义,VM正在解释每条指令.但在二进制文件的情况下,指令是真正的CPU可执行指令,所以我不认为操作系统像VM一样.
nginx代理有一个指令,proxy_cache_min_uses但是我找不到使用的时间窗口或设置时间的方法。因为如果它不使用任何时间窗口,而是仅等待请求到达某个计数器,那么如果您将nginx保持足够长的运行时间,最终所有请求都将执行。
还是由于最近使用最少的策略而很快从缓存中撤出了一个相对罕见的请求,我对此不必太担心吗?
谢谢
我的想法是有一个foo生成输入列表(用户、报告、日志文件等)的任务,并为输入列表中的每个元素启动一个任务。目标是利用 Airflow 的重试和其他逻辑,而不是重新实现它。
这里唯一的变量是生成的任务数量。在完成所有这些任务后,我想再做一些任务,因此为每个任务启动一个新的 DAG 似乎并不合适。
这是我的代码:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1)
}
dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)
foo_operator = BashOperator(
task_id='foo',
bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
xcom_push=True,
dag=dag)
def gen_nodes(**kwargs):
ti = kwargs['ti']
workers = json.loads(ti.xcom_pull(task_ids='foo'))
for wid in workers:
print("Iterating worker %s" % wid)
op = PythonOperator(
task_id='test_op_%s' % wid,
python_callable=lambda: print("Dynamic task!"),
dag=dag
)
op.set_downstream(bar_operator)
op.set_upstream(dummy_op)
gen_subdag_node_op = PythonOperator(
task_id='gen_subdag_nodes',
python_callable=gen_nodes,
provide_context=True,
dag=dag
)
gen_subdag_node_op.set_upstream(foo_operator)
dummy_op …Run Code Online (Sandbox Code Playgroud)