小编str*_*der的帖子

ModuleNotFoundError - 导入 Python 文件时出现气流错误

我创建了一个非常简单的 DAG 来使用 PythonOperator 执行 Python 文件。我正在使用 docker 映像来运行 Airflow,但它无法识别我拥有 .py 文件的模块

结构是这样的:

main_dag.py
plugins/__init__.py
plugins/njtransit_scrapper.py
plugins/sql_queries.py
plugins/config/config.cfg
Run Code Online (Sandbox Code Playgroud)

cmd 运行 docker Airflow 镜像:

docker run -p 8080:8080 -v /My/Path/To/Dags:/usr/local/airflow/dags  puckel/docker-airflow webserver
Run Code Online (Sandbox Code Playgroud)

我已经尝试过airflow initdb重新启动网络服务器,但它一直显示错误ModuleNotFoundError: No module named 'plugins'

对于我正在使用的导入语句:

from plugins import njtransit_scrapper
Run Code Online (Sandbox Code Playgroud)

这是我的 PythonOperator:

tweets_load = PythonOperator(
    task_id='Tweets_load',
    python_callable=njtransit_scrapper.main,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

我的 njtransit_scrapper.py 文件只是一个收集推特帐户的所有推文并将结果保存在 Postgres 数据库中的文件。

如果我删除 PythonOperator 代码并导入代码,则代码可以正常工作。我已经测试了几乎所有内容,但我不太确定这是一个错误还是其他什么。

有可能当我为 docker 映像创建一个卷时,它只是导入主 dag 并停在那里,导致不导入整个包?

python airflow

7
推荐指数
1
解决办法
6738
查看次数

在 DataFrame 中实现自动增量列

我正在尝试在 DataFrame 中实现自动增量列。我已经找到了解决方案,但我想知道是否有更好的方法来做到这一点。

我正在使用monotonically_increasing_id()来自 的函数pyspark.sql.functions。问题是从 0 开始,而我希望它从 1 开始。

所以,我做了以下工作并且工作正常:

(F.monotonically_increasing_id()+1).alias("songplay_id")

dfLog.join(dfSong, (dfSong.artist_name == dfLog.artist) & (dfSong.title == dfLog.song))\
                    .select((F.monotonically_increasing_id()+1).alias("songplay_id"), \
                               dfLog.ts.alias("start_time"), dfLog.userId.alias("user_id"), \
                               dfLog.level, \
                               dfSong.song_id, \
                               dfSong.artist_id, \
                               dfLog.sessionId.alias("session_id"), \
                               dfLog.location, \
                               dfLog.userAgent.alias("user_agent"))
Run Code Online (Sandbox Code Playgroud)

有没有更好的方法来实现我想做的事情?我认为,为此实现 udf 函数的工作量太大,还是只有我一个人?

谢谢。-

python apache-spark pyspark

2
推荐指数
1
解决办法
5537
查看次数

标签 统计

python ×2

airflow ×1

apache-spark ×1

pyspark ×1