小编pan*_*sen的帖子

具有不同窗口规格的链式火花列表达式会产生低效的 DAG

语境

假设您处理时间序列数据。您想要的结果依赖于具有不同窗口规格的多个窗口函数。结果可能类似于单个火花列表达式,例如间隔标识符。

现状

通常,我不使用df.withColumn链式/堆栈列表达式来存储中间结果,并且相信 Spark 会找到最有效的 DAG(在处理 DataFrame 时)。

可重现的例子

但是,在以下示例(PySpark 2.4.4 独立版)中,存储中间结果df.withColumn降低了 DAG 的复杂性。让我们考虑以下测试设置:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),        
    }
)

df = spark.createDataFrame(dfp)
df.show(5)
Run Code Online (Sandbox Code Playgroud)
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window …
Run Code Online (Sandbox Code Playgroud)

python dataframe directed-acyclic-graphs apache-spark pyspark

9
推荐指数
1
解决办法
229
查看次数

Spark DAG 与“withColumn”和“select”不同

语境

在最近的SO-post 中,我发现withColumn在处理堆叠/链列表达式以及不同的窗口规范时,使用可能会改进 DAG。然而,在这个例子中,withColumn实际上使 DAG 变得更糟,并且与使用select相反的结果不同。

可重现的例子

首先,一些测试数据(PySpark 2.4.4 独立版):

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),      
        "col5": np.random.randint(0, 5, size=100),        

    }
)

df = spark.createDataFrame(dfp)
df.show(5)

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|   0|   3|   2|   2|   2|
| …
Run Code Online (Sandbox Code Playgroud)

python dataframe directed-acyclic-graphs apache-spark pyspark

8
推荐指数
1
解决办法
3019
查看次数

如何在Pyspark中检查Spark分区中的特定分区数据

我在我的hive表中在pyspark中创建了两个数据帧:

data1 = spark.sql("""
   SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
   from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");
Run Code Online (Sandbox Code Playgroud)

每个国家/地区都有数百万字母数字格式的唯一ID.

data2 = spark.sql("""
   SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
   from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");
Run Code Online (Sandbox Code Playgroud)

我想在ID列上使用pyspark加入这两个数据帧.

我们如何重新划分数据,使其在分区中均匀分布.

我可以使用下面的数据来修复我的数据吗?

newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")
Run Code Online (Sandbox Code Playgroud)

什么是分区的最佳方式,以便加快工作?

hadoop-partitioning pyspark

6
推荐指数
1
解决办法
1791
查看次数

熊猫read_csv转换器性能问题

描述

在读取大型csv文件(几百万行混合数据)时,我使用了converterspandas read_csv方法的参数来方便地传递将字符串转换为日期时间对象等的函数。

但是,与手动转换相应的列相比,使用converters参数非常慢。

范例程式码

为了说明起见,让我们使用3种不同的方法将字符串转换为datetime对象:

  • 转换器参数
  • parse_dates / date_parser参数
  • 加载csv后手动

注意,这里从字符串到日期时间的转换是任意的。可以用其他函数代替(除了没有特定的parse_dates / date_parser参数外)。

import pandas as pd # 0.19.2 with python 3.5

# create dummy data
rows = 100000
data = {"dates": pd.date_range("2017-02-27 20:44:23", periods=rows, freq="S")}

# save as temporary file for timeit
pd.DataFrame(data).to_csv("dummy")

# define converters
def convert_datetime(series):
    return pd.to_datetime(series, format="%Y-%m-%d %H:%M:%S")
Run Code Online (Sandbox Code Playgroud)

现在,让我们看看timeit(Ipython)的比较:

%%timeit
df = pd.read_csv("dummy", converters={"dates": convert_datetime})
# 1 loop, best of 3: 7.76 s per loop
Run Code Online (Sandbox Code Playgroud)
%%timeit
df = pd.read_csv("dummy", parse_dates=["dates"], date_parser=convert_datetime) …
Run Code Online (Sandbox Code Playgroud)

python csv performance dataframe pandas

5
推荐指数
1
解决办法
4897
查看次数

在python中使用to_csv和index = False将pandas数据帧写入CSV时丢失数据

我正在尝试编写一个只有一列 2k 行的 Pandas 数据框。我想要一个 csv 文件,其中包含没有索引的所有 2k 行数据帧。

my_df = pd.DataFrame(test_label)
my_df.columns = ['Names']
my_df.to_csv('/Users/neeru/Desktop/dats/test1.csv', index = False, header = True)
Run Code Online (Sandbox Code Playgroud)

在使用上面的代码时,我在保存的 csv 文件中得到了大约 1.7k 行,而当我制作 . 时index = True,我得到了所有 2k 行以及索引。

我应该怎么做才能获得一个包含所有没有索引的行的单列的 csv 文件?

PS:我刚刚开始使用熊猫。

python csv dataframe pandas

5
推荐指数
1
解决办法
2803
查看次数