我猜这是一个简单的解决方案,但是我遇到了一个问题,即使用to_csv()函数将pandas数据帧保存到csv文件需要将近一个小时.我正在使用带有pandas(0.19.1)的anaconda python 2.7.12.
import os
import glob
import pandas as pd
src_files = glob.glob(os.path.join('/my/path', "*.csv.gz"))
# 1 - Takes 2 min to read 20m records from 30 files
for file_ in sorted(src_files):
stage = pd.DataFrame()
iter_csv = pd.read_csv(file_
, sep=','
, index_col=False
, header=0
, low_memory=False
, iterator=True
, chunksize=100000
, compression='gzip'
, memory_map=True
, encoding='utf-8')
df = pd.concat([chunk for chunk in iter_csv])
stage = stage.append(df, ignore_index=True)
# 2 - Takes 55 min to write 20m records from …Run Code Online (Sandbox Code Playgroud) 真的很喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到了错误:“ {jobs.py:538} ERROR - Dag running is deadlocked for DAG: TEST_SCHEDULER_DAG ”。
这是一个新的气流安装 (v1.7.1.3),我已经能够很好地运行其他预定的 dag 文件。我的环境是 Linux (ubuntu 16.04)、python 2.7.12 (anaconda)、postgresql 9.5.5,并且使用 LocalExecutor。
我遇到死锁错误的 DAG 是:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'owner.name',
'depends_on_past': True,
'start_date': datetime(2016, 11, 30, 8, 0, 0),
'retries': 0,
'retry_delay': timedelta(seconds=60),
}
tst_dag = DAG(dag_id='TEST_SCHEDULER_DAG',
default_args=default_args,
schedule_interval='10 * * * *')
t1 = BashOperator(
task_id='task_1',
bash_command='sleep 10',
dag=tst_dag) …Run Code Online (Sandbox Code Playgroud)