假设您处理时间序列数据。您想要的结果依赖于具有不同窗口规格的多个窗口函数。结果可能类似于单个火花列表达式,例如间隔标识符。
通常,我不使用df.withColumn链式/堆栈列表达式来存储中间结果,并且相信 Spark 会找到最有效的 DAG(在处理 DataFrame 时)。
但是,在以下示例(PySpark 2.4.4 独立版)中,存储中间结果df.withColumn降低了 DAG 的复杂性。让我们考虑以下测试设置:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
{
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
}
)
df = spark.createDataFrame(dfp)
df.show(5)
Run Code Online (Sandbox Code Playgroud)
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window …Run Code Online (Sandbox Code Playgroud) python dataframe directed-acyclic-graphs apache-spark pyspark
在最近的SO-post 中,我发现withColumn在处理堆叠/链列表达式以及不同的窗口规范时,使用可能会改进 DAG。然而,在这个例子中,withColumn实际上使 DAG 变得更糟,并且与使用select相反的结果不同。
首先,一些测试数据(PySpark 2.4.4 独立版):
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
dfp = pd.DataFrame(
{
"col1": np.random.randint(0, 5, size=100),
"col2": np.random.randint(0, 5, size=100),
"col3": np.random.randint(0, 5, size=100),
"col4": np.random.randint(0, 5, size=100),
"col5": np.random.randint(0, 5, size=100),
}
)
df = spark.createDataFrame(dfp)
df.show(5)
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
| 0| 3| 2| 2| 2|
| …Run Code Online (Sandbox Code Playgroud) python dataframe directed-acyclic-graphs apache-spark pyspark
我在我的hive表中在pyspark中创建了两个数据帧:
data1 = spark.sql("""
SELECT ID, MODEL_NUMBER, MODEL_YEAR ,COUNTRY_CODE
from MODEL_TABLE1 where COUNTRY_CODE in ('IND','CHN','USA','RUS','AUS')
""");
Run Code Online (Sandbox Code Playgroud)
每个国家/地区都有数百万字母数字格式的唯一ID.
data2 = spark.sql("""
SELECT ID,MODEL_NUMBER, MODEL_YEAR, COUNTRY_CODE
from MODEL_TABLE2 where COUNTRY_CODE in ('IND','CHN')
""");
Run Code Online (Sandbox Code Playgroud)
我想在ID列上使用pyspark加入这两个数据帧.
我们如何重新划分数据,使其在分区中均匀分布.
我可以使用下面的数据来修复我的数据吗?
newdf1 = data2.repartition(100, "ID")
newdf2 = data2.repartition(100, "ID")
Run Code Online (Sandbox Code Playgroud)
什么是分区的最佳方式,以便加快工作?
在读取大型csv文件(几百万行混合数据)时,我使用了converterspandas read_csv方法的参数来方便地传递将字符串转换为日期时间对象等的函数。
但是,与手动转换相应的列相比,使用converters参数非常慢。
为了说明起见,让我们使用3种不同的方法将字符串转换为datetime对象:
注意,这里从字符串到日期时间的转换是任意的。可以用其他函数代替(除了没有特定的parse_dates / date_parser参数外)。
import pandas as pd # 0.19.2 with python 3.5
# create dummy data
rows = 100000
data = {"dates": pd.date_range("2017-02-27 20:44:23", periods=rows, freq="S")}
# save as temporary file for timeit
pd.DataFrame(data).to_csv("dummy")
# define converters
def convert_datetime(series):
return pd.to_datetime(series, format="%Y-%m-%d %H:%M:%S")
Run Code Online (Sandbox Code Playgroud)
现在,让我们看看timeit(Ipython)的比较:
%%timeit
df = pd.read_csv("dummy", converters={"dates": convert_datetime})
# 1 loop, best of 3: 7.76 s per loop
Run Code Online (Sandbox Code Playgroud)
%%timeit
df = pd.read_csv("dummy", parse_dates=["dates"], date_parser=convert_datetime) …Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个只有一列 2k 行的 Pandas 数据框。我想要一个 csv 文件,其中包含没有索引的所有 2k 行数据帧。
my_df = pd.DataFrame(test_label)
my_df.columns = ['Names']
my_df.to_csv('/Users/neeru/Desktop/dats/test1.csv', index = False, header = True)
Run Code Online (Sandbox Code Playgroud)
在使用上面的代码时,我在保存的 csv 文件中得到了大约 1.7k 行,而当我制作 . 时index = True,我得到了所有 2k 行以及索引。
我应该怎么做才能获得一个包含所有没有索引的行的单列的 csv 文件?
PS:我刚刚开始使用熊猫。