我创建了一个非常简单的 DAG 来使用 PythonOperator 执行 Python 文件。我正在使用 docker 映像来运行 Airflow,但它无法识别我拥有 .py 文件的模块
结构是这样的:
main_dag.py
plugins/__init__.py
plugins/njtransit_scrapper.py
plugins/sql_queries.py
plugins/config/config.cfg
Run Code Online (Sandbox Code Playgroud)
cmd 运行 docker Airflow 镜像:
docker run -p 8080:8080 -v /My/Path/To/Dags:/usr/local/airflow/dags puckel/docker-airflow webserver
Run Code Online (Sandbox Code Playgroud)
我已经尝试过airflow initdb
重新启动网络服务器,但它一直显示错误ModuleNotFoundError: No module named 'plugins'
对于我正在使用的导入语句:
from plugins import njtransit_scrapper
Run Code Online (Sandbox Code Playgroud)
这是我的 PythonOperator:
tweets_load = PythonOperator(
task_id='Tweets_load',
python_callable=njtransit_scrapper.main,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
我的 njtransit_scrapper.py 文件只是一个收集推特帐户的所有推文并将结果保存在 Postgres 数据库中的文件。
如果我删除 PythonOperator 代码并导入代码,则代码可以正常工作。我已经测试了几乎所有内容,但我不太确定这是一个错误还是其他什么。
有可能当我为 docker 映像创建一个卷时,它只是导入主 dag 并停在那里,导致不导入整个包?
我正在尝试在 DataFrame 中实现自动增量列。我已经找到了解决方案,但我想知道是否有更好的方法来做到这一点。
我正在使用monotonically_increasing_id()
来自 的函数pyspark.sql.functions
。问题是从 0 开始,而我希望它从 1 开始。
所以,我做了以下工作并且工作正常:
(F.monotonically_increasing_id()+1).alias("songplay_id")
dfLog.join(dfSong, (dfSong.artist_name == dfLog.artist) & (dfSong.title == dfLog.song))\
.select((F.monotonically_increasing_id()+1).alias("songplay_id"), \
dfLog.ts.alias("start_time"), dfLog.userId.alias("user_id"), \
dfLog.level, \
dfSong.song_id, \
dfSong.artist_id, \
dfLog.sessionId.alias("session_id"), \
dfLog.location, \
dfLog.userAgent.alias("user_agent"))
Run Code Online (Sandbox Code Playgroud)
有没有更好的方法来实现我想做的事情?我认为,为此实现 udf 函数的工作量太大,还是只有我一个人?
谢谢。-