小编Pra*_*ran的帖子

基于Webrequest在Airflow上运行作业

我想知道在通过HTTP获取请求时是否可以执行气流任务.我对Airflow的调度部分不感兴趣.我只想用它作为芹菜的替代品.

所以一个示例操作就是这样的.

  1. 用户提交请求报告的表单.
  2. 后端接收请求并向用户发送已收到请求的通知.
  3. 然后,后端使用Airflow安排作业立即运行.
  4. 然后,Airflow执行与DAG相关的一系列任务.例如,首先从redshift中提取数据,从MySQL提取数据,在两个结果集上进行一些操作,将它们组合,然后将结果上传到Amazon S3,发送电子邮件.

从我在线阅读的内容中,您可以通过airflow ...在命令行上执行来运行气流作业.我想知道是否有一个python api可以执行相同的事情.

谢谢.

python airflow

20
推荐指数
3
解决办法
1万
查看次数

初始化不同价值的芹菜工人

我正在使用芹菜在Hadoop上运行长时间运行的任务.每个任务在Hadoop上执行一个Pig脚本,运行大约30分钟 - 2小时.

我当前的Hadoop设置有4个队列a,b,c和默认值.所有任务当前都由单个工作程序执行,该工作程序将作业提交到单个队列.

我想再添加3个将作业提交到其他队列的工作者,每个队列一个工作者.

问题是队列目前是硬编码的,我希望每个工人都有这个变量.

我搜索了很多,但我无法找到一种方法来传递每个芹菜工作者不同的队列值并在我的任务中访问它.

我这样开始我的芹菜工人.

celery -A app.celery worker
Run Code Online (Sandbox Code Playgroud)

我希望在命令行本身传递一些额外的参数并在我的任务中访问它,但芹菜抱怨它不理解我的自定义参数.

我计划通过设置--concurrency=3参数来运行同一主机上的所有worker .有没有解决这个问题的方法?

谢谢!

编辑

目前的情况是这样的.我每次尝试执行任务print_something都说tasks.print_something.delay()它只打印队列C.

@celery.task()
def print_something():
    print "C"
Run Code Online (Sandbox Code Playgroud)

我需要让工作人员根据我在启动时传递给他们的值来打印一个可变字母.

@celery.task()
def print_something():
    print "<Variable Value Per Worker Here>"
Run Code Online (Sandbox Code Playgroud)

celery flask python-2.7

8
推荐指数
1
解决办法
640
查看次数

标签 统计

airflow ×1

celery ×1

flask ×1

python ×1

python-2.7 ×1