我目前正在使用Apache Arrow的java API(尽管我在Scala中使用它来获取代码示例)以熟悉这个工具.
作为练习,我选择将CSV文件加载到箭头向量中,然后将这些文件保存到箭头文件中.第一部分似乎很容易,我试过这样:
val csvLines: Stream[Array[String]] = <open stream from CSV parser>
// There are other types of allocator, but things work with this one...
val allocator = new RootAllocator(Int.MaxValue)
// Initialize the vectors
val vectors = initVectors(csvLines.head, allocator)
// Put their mutators into an array for easy access
val mutators = vectors.map(_.getMutator)
// Work on the data, zipping it with its index
Stream.from(0)
.zip(csvLines.tail) // Work on the tail (head contains the headers)
.foreach(rowTup => // …Run Code Online (Sandbox Code Playgroud) 我正在尝试从 pandas_udf 返回特定结构。它在一个集群上工作,但在另一个集群上失败。我尝试在组上运行 udf,这要求返回类型为数据框。
from pyspark.sql.functions import pandas_udf
import pandas as pd
import numpy as np
from pyspark.sql.types import *
schema = StructType([
StructField("Distance", FloatType()),
StructField("CarId", IntegerType())
])
def haversine(lon1, lat1, lon2, lat2):
#Calculate distance, return scalar
return 3.5 # Removed logic to facilitate reading
@pandas_udf(schema)
def totalDistance(oneCar):
dist = haversine(oneCar.Longtitude.shift(1),
oneCar.Latitude.shift(1),
oneCar.loc[1:, 'Longitude'],
oneCar.loc[1:, 'Latitude'])
return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0])
## Calculate the overall distance made by each car
distancePerCar= df.groupBy('CarId').apply(totalDistance)
Run Code Online (Sandbox Code Playgroud)
这是我得到的例外:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py …Run Code Online (Sandbox Code Playgroud) 我正在尝试将数据帧保存为 .arrow 格式,主要是为了获得比 CSV 更好的大小,以便将该文件用于 vega-lite
我正在使用Python
import pandas
import pyarrow as pa
csv="C:/Users/mimoune.djouallah/data.csv"
arrow ="C:/Users/mimoune.djouallah/file.arrow"
dataset = pandas.read_csv(csv)
table = pa.Table.from_pandas(dataset)
writer = pa.RecordBatchFileWriter(arrow, table.schema)
writer.write(table)
writer.close()
Run Code Online (Sandbox Code Playgroud)
我原以为箭头文件会小于 csv,但现在箭头稍大一些
我尝试使用 parquet 导出,结果符合预期
原始 csv:4.4 MB 箭头:4.9 MB parquet:1.6 MB PowerBI(仅供参考):1.7 MB
我正在读取一组箭头文件并将它们写入镶木地板文件:
import pathlib
from pyarrow import parquet as pq
from pyarrow import feather
import pyarrow as pa
base_path = pathlib.Path('../mydata')
fields = [
pa.field('value', pa.int64()),
pa.field('code', pa.dictionary(pa.int32(), pa.uint64(), ordered=False)),
]
schema = pa.schema(fields)
with pq.ParquetWriter('sample.parquet', schema) as pqwriter:
for file_path in base_path.glob('*.arrow'):
table = feather.read_table(file_path)
pqwriter.write_table(table)
Run Code Online (Sandbox Code Playgroud)
我的问题是code箭头文件中的字段是用索引int8而不是int32. 然而范围int8还不够。因此,我定义了一个模式,其中包含parquet 文件中int32字段的索引。code
但是,将箭头表写入 parquet 现在会抱怨架构不匹配。
如何更改箭头列的数据类型?我检查了 pyarrow API,没有找到更改架构的方法。这可以在不往返熊猫的情况下完成吗?
在打开大型 csv 文件时,{arrow} 的列类型自动检测给我带来了一些麻烦。特别是,它会删除某些标识符的前导零,并执行其他一些不幸的操作。由于数据集相当宽(几百列)并且我不想手动设置所有架构值,因此我想以某种方式以编程方式设置它。
一个好的开始是在使用 . 打开数据集时将所有arrow::open_dataset列转换为字符。或者更正datase_connection$schema特定列的现有对象。
但是,我不知道该怎么做。
SL No: Customer Month Amount
1 A1 12-Jan-04 495414.75
2 A1 3-Jan-04 245899.02
3 A1 15-Jan-04 259490.06
Run Code Online (Sandbox Code Playgroud)
我的Df在上面
代码
import findspark
findspark.init('/home/mak/spark-3.0.0-preview2-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('mak').getOrCreate()
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pdf3 = pd.read_csv('Repayment.csv')
df_repay = spark.createDataFrame(pdf3)
Run Code Online (Sandbox Code Playgroud)
仅加载df_repay有问题,其他数据帧加载成功。当我将上面的代码转移到下面的代码时,它成功运行了
df4 = (spark.read.format("csv").options(header="true")
.load("Repayment.csv"))
spark.createDataFrame(pdf3)而类似的数据帧加载成功两者都是语言中立和平台中立的数据交换库。我想知道它们有什么区别,哪个库适合哪种情况。
以下是对我有用的解决方案,但不确定这是否是最好的方法。我用brew来安装它。不幸的是,vcpkg 目前无法工作。我不喜欢这个解决方案的是我需要单独设置Parquet_DIR和find_package(Parquet)。
set(Parquet_DIR /usr/local/lib/cmake/arrow)
find_package(Arrow CONFIG REQUIRED)
find_package(Parquet CONFIG REQUIRED)
target_link_libraries(database PRIVATE arrow_shared parquet_shared)
Run Code Online (Sandbox Code Playgroud) 根据文档:
class arrow::StringType : public arrow::BinaryType
#include <arrow/type.h>
Concrete type class for variable-size string data, utf8-encoded.
Run Code Online (Sandbox Code Playgroud)
class arrow::LargeStringType : public arrow::LargeBinaryType
#include <arrow/type.h>
Concrete type class for large variable-size string data, utf8-encoded.
Run Code Online (Sandbox Code Playgroud)
多大才算“大”?
这两种数据类型有什么区别?为什么我们需要 2 而不是 1?
我有一个名为的数据集,df其中有年、月和日变量。我想使用该write_dataset函数输出具有标准箭头数据集语法的文件夹,如下图所示:
每个文件夹内将有month=1、month=2,依此类推。
现在,为了创建它,我使用了以下代码:
df <- df %>% group_by(year, month, day)
output_folder = "my/path"
arrow::write_dataset(df,
output_folder,
format = "parquet",
)
Run Code Online (Sandbox Code Playgroud)
但是,我的数据集太大,我想利用data.table快速分组的优势。我做同样的事情的方法如下:
grouping_cols = c("year", "month", "day")
setkeyv(df, grouping_cols)
arrow::write_dataset(df,
output_folder,
format = "parquet",
)
Run Code Online (Sandbox Code Playgroud)
但是,现在结果未分组,并且返回单个 .parquet 文件(未充分利用 的潜力arrow::write_dataset)。
有没有办法让相同的数据集按指定列进行分组,但基于而data.table不是dplyr分组?