mpt*_*ach 2 python docker docker-compose airflow
我正在尝试运行一个 Airflow DAG 来查询 Airflow Postgres 数据库中的 dag 表。以下是 DAG 的代码:
\n\nfrom airflow import DAG\nfrom airflow.operators.dummy_operator import DummyOperator\nfrom airflow.operators.python_operator import PythonOperator\nfrom airflow.hooks.postgres_hook import PostgresHook\nfrom datetime import datetime\n\ndefault_args = {\n    \'owner\': \'airflow\',\n    \'depend_on_past\': False,\n    \'start_date\': datetime(year=2019, month=10, day=1),\n    \'retries\': 0\n}\n\ndef get_dag_table():\n    query = \'SELECT * FROM dag LIMIT 5;\'\n    hook = PostgresHook(postgre_conn_id=\'postgres_default\',\n                        host=\'localhost\',\n                        database=\'airflow\',\n                        user=\'airflow\',\n                        password=\'airflow\',\n                        port=5432)\n    connection = hook.get_conn()\n    # COMMENTED OUT FOR DEBUGGING\n    # cursor = connection.cursor()\n    # cursor.execute(request)\n    # return cursor.fetchall()\n\ndag = DAG(\n    "custom_postgres_tutorial",\n    default_args=default_args,\n    schedule_interval=None\n)\n\nstart_task = DummyOperator(task_id=\'start_task\', dag=dag)\npostgres_task = PythonOperator(task_id=\'query_dag_table\',\n                               python_callable=get_dag_table,\n                               dag=dag)\nstart_task >> postgres_task\n以下是我遵循的步骤:
\n\n1)我克隆了 Puckel docker-airflow 存储库(https://github.com/puckel/docker-airflow)。
\n\n2) 然后我运行命令$ docker-compose -f docker-compose-LocalExecutor.yml up -d来启动 Airflow Web 服务器和 Postgres 数据库。
3)创建了一个如下所示的自定义连接:
4)当我触发DAG时,出现以下错误:
\n\n[2019-10-07 14:51:11,034] {{taskinstance.py:1078}} INFO - Marking task as FAILED.\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table Traceback (most recent call last):\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/bin/airflow", line 32, in <module>\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     args.func(args)\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return f(*args, **kwargs)\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 522, in run\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     _run(args, dag, ti)\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 440, in _run\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     pool=args.pool,\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return func(*args, **kwargs)\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     result = task_copy.execute(context=context)\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute\n[2019-10-07 14:51:11,050] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return_value = self.execute_callable()\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     return self.python_callable(*self.op_args, **self.op_kwargs)\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/airflow/dags/tutorial-postgres.py", line 23, in get_dag_table\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     connection = hook.get_conn()\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/airflow/hooks/postgres_hook.py", line 75, in get_conn\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     self.conn = psycopg2.connect(**conn_args)\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table   File "/usr/local/lib/python3.7/site-packages/psycopg2/__init__.py", line 130, in connect\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table     conn = _connect(dsn, connection_factory=connection_factory, **kwasync)\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table psycopg2.OperationalError: could not connect to server: Connection refused\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  Is the server running on host "localhost" (127.0.0.1) and accepting\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  TCP/IP connections on port 5432?\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table could not connect to server: Cannot assign requested address\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  Is the server running on host "localhost" (::1) and accepting\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table  TCP/IP connections on port 5432?\n[2019-10-07 14:51:11,051] {{base_task_runner.py:115}} INFO - Job 5229: Subtask query_dag_table \n我尝试遵循我在网上找到的所有建议,但都没有解决这个问题。我很困惑,因为我可以使用 pycharm 连接到数据库:
\n\n\n\n此外,当我运行该命令时,$ docker container ls我得到以下结果,显示 Postgres 容器在端口 5432 上打开:
CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS                 PORTS                                        NAMES\nxxxxxxxxxxxx        puckel/docker-airflow:1.10.4   "/entrypoint.sh webs\xe2\x80\xa6"   2 hours ago         Up 2 hours (healthy)   5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp   docker-airflow_webserver_1\nxxxxxxxxxxxx        postgres:9.6                   "docker-entrypoint.s\xe2\x80\xa6"   2 days ago          Up 2 hours             0.0.0.0:5432->5432/tcp                       docker-airflow_postgres_1\n| 归档时间: | 
 | 
| 查看次数: | 5402 次 | 
| 最近记录: |