我有 10 个非常大的 CSV 文件(可能有也可能没有相同的标题),我正在使用“readr”包 read_csv_chunked() 连续读取和处理这些文件。目前,我可以使用 10 个内核并行读取 10 个文件。该过程仍需要一个小时。我有128个核心。我可以将每个 CSV 分成 10 个块,以便对每个文件并行处理,从而利用 100 个核心吗?这是我目前拥有的(创建两个示例文件仅用于测试):
library(doParallel)
library(foreach)
# Create a list of two sample CSV files and a filter by file
df_1 <- data.frame(matrix(sample(1:300), ncol = 3))
df_2 <- data.frame(matrix(sample(1:200), ncol = 4))
filter_by_df <- data.frame(X1 = 1:100)
write.csv(df_1, "df_1.csv", row.names = FALSE)
write.csv(df_2, "df_2.csv", row.names = FALSE)
files <- c("df_1.csv", "df_2.csv")
# Create a function to read and filter each file in chunks
my_function <-
function(file) { …Run Code Online (Sandbox Code Playgroud) 我创建了一个没有标题的 PySpark RDD(从 XML 转换为 CSV)。我需要将其转换为带有标头的 DataFrame,以便对其执行一些 SparkSQL 查询。我似乎找不到添加标题的简单方法。大多数示例都从已有标题的数据集开始。
df = spark.read.csv('some.csv', header=True, schema=schema)
Run Code Online (Sandbox Code Playgroud)
但是,我需要附加标题。
headers = ['a', 'b', 'c', 'd']
Run Code Online (Sandbox Code Playgroud)
这似乎是一个微不足道的问题,我不确定为什么我找不到有效的解决方案。谢谢。