小编Kim*_*oll的帖子

Pandas to_csv()减慢了保存大型数据帧的速度

我猜这是一个简单的解决方案,但是我遇到了一个问题,即使用to_csv()函数将pandas数据帧保存到csv文件需要将近一个小时.我正在使用带有pandas(0.19.1)的anaconda python 2.7.12.

import os
import glob
import pandas as pd

src_files = glob.glob(os.path.join('/my/path', "*.csv.gz"))

# 1 - Takes 2 min to read 20m records from 30 files
for file_ in sorted(src_files):
    stage = pd.DataFrame()
    iter_csv = pd.read_csv(file_
                     , sep=','
                     , index_col=False
                     , header=0
                     , low_memory=False
                     , iterator=True
                     , chunksize=100000
                     , compression='gzip'
                     , memory_map=True
                     , encoding='utf-8')

    df = pd.concat([chunk for chunk in iter_csv])
    stage = stage.append(df, ignore_index=True)

# 2 - Takes 55 min to write 20m records from …
Run Code Online (Sandbox Code Playgroud)

python csv dataframe pandas

11
推荐指数
3
解决办法
9215
查看次数

Airflow - BashOperator:获取“Dag 运行已为 DAG 死锁:...”错误

真的很喜欢 Airflow 工作流调度程序,但在运行一个简单的 DAG 时遇到了错误:“ {jobs.py:538} ERROR - Dag running is deadlocked for DAG: TEST_SCHEDULER_DAG ”。

这是一个新的气流安装 (v1.7.1.3),我已经能够很好地运行其他预定的 dag 文件。我的环境是 Linux (ubuntu 16.04)、python 2.7.12 (anaconda)、postgresql 9.5.5,并且使用 LocalExecutor。

我遇到死锁错误的 DAG 是:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'owner.name',
    'depends_on_past': True,
    'start_date': datetime(2016, 11, 30, 8, 0, 0),
    'retries': 0,
    'retry_delay': timedelta(seconds=60),
}

tst_dag = DAG(dag_id='TEST_SCHEDULER_DAG',
              default_args=default_args,
              schedule_interval='10 * * * *')

t1 = BashOperator(
    task_id='task_1',
    bash_command='sleep 10',
    dag=tst_dag) …
Run Code Online (Sandbox Code Playgroud)

python concurrency workflow airflow

2
推荐指数
1
解决办法
4761
查看次数

标签 统计

python ×2

airflow ×1

concurrency ×1

csv ×1

dataframe ×1

pandas ×1

workflow ×1