我按照PEP420隐式命名空间打包方法创建了多个包。其中两个分发包是dende-github-api和dende-gitlab-api。每个发行版都包含一个模块,我希望可以在dende.api命名空间下访问该模块。可以在这里找到一个最小的工作示例: https ://github.com/dende/example-monorepositry
这是包含两个发行版的存储库的文件夹结构:
\nexample-monorepository\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dende-github-api\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dende\n\xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 api\n\xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 github.py\n\xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 setup.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dende-gitlab-api\n\xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 dende\n\xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 api\n\xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 gitlab.py\n\xe2\x94\x82 \xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x82 \xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 setup.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 requirements.dev.txt\n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 requirements.txt\nRun Code Online (Sandbox Code Playgroud)\n\看起来setup.py像这样(仅显示 dende-github-api):
from setuptools import setup\n\nsetup(\n name=\'dende-github-api\',\n packages=[\'dende.api\'],\n install_requires=[\n \'PyGithub\'\n ]\n)\nRun Code Online (Sandbox Code Playgroud)\n我可以安装这两个发行版并很好地使用它们:
\n$ pip install ./dende-github-api\n$ pip install ./dende-gitlab-api\n$ python -m dende.api.github\nHi from dende-github-api from /home/c/git/example-monorepositry/venv/lib/python3.8/site-packages/dende/api/github.py\n$ python -m dende.api.gitlab\nHi from …Run Code Online (Sandbox Code Playgroud) 我在气流中编写了以下代码,在dag本地测试了脚本,它像梦一样工作。现在,我试图使它在气流中运转,dag但没有任何运气,我尝试了多种运算,但无济于事
def fill_nulls (ds,file_in):
csv_file = glob.glob(os.path.join(r'/tmp/', file_in))
df = pd.read_csv(csv_file, sep='\t',header=None,error_bad_lines=False, index_col=False, dtype='unicode')
df = df.fillna(r'\N')
df.loc[:,df.dtypes==object].apply(lambda s:s.str.replace(" ", r'\N'))
df.to_csv(csv_file,sep='\t',header=None,index=False, quoting=csv.QUOTE_NONE)
fill_nulls = PythonOperator(
task_id='fill_nulls',
python_callable=fill_nulls,
provide_context=True,
templates_dict = {'file_in':'apollo_export_{{macros.ds_format(macros.ds_add( ds, -2),\'%Y-%m-%d\',\'%Y%m%d\')}}.csv'},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
: Traceback (most recent call last):
[2018-01-25 10:11:50,016] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: args.func(args)
[2018-01-25 10:11:50,017] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-01-25 10:11:50,017] …Run Code Online (Sandbox Code Playgroud) 我已经建立了一个气流工作流程,该流程将一些文件从s3提取到Google Cloud存储,然后运行sql查询的工作流程以在Big Query上创建新表。在工作流结束时,我需要将一个最终的Big Query表的输出推送到Google Cloud Storage,再从那里推送到S3。
我已经使用BigQueryToCloudStorageOperatorpython运算符成功地将Big Query表传输到Google Cloud Storage 。但是,从Google Cloud Storage到S3的迁移似乎并不那么困难,我无法找到可以在我的Airflow工作流程中实现自动化的解决方案。
我知道rsync哪个是其中的一部分,gsutil并且已经完成了这项工作(请参阅将数据从Google Cloud Storage导出到Amazon S3),但是我无法将其添加到我的工作流程中。
我有一个在计算引擎实例上运行的泊坞窗式气流容器。
非常感谢您帮助解决此问题。
非常感谢!
python amazon-s3 google-cloud-storage google-cloud-platform airflow