我在不是很大的 DataFrame 上使用 toPandas() ,但出现以下异常:
18/10/31 19:13:19 ERROR Executor: Exception in task 127.2 in stage 13.0 (TID 2264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 261, in dump_stream
batch = _create_batch(series, self._timezone)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in <listcomp>
arrs = [create_array(s, t) for s, t in series] …Run Code Online (Sandbox Code Playgroud) 将镶木地板文件转换为数据框时,我遇到了文件类型问题。
我愿意
bucket = 's3://some_bucket/test/usages'
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()
read_pq = pq.ParquetDataset(bucket, filesystem=s3).read_pandas()
Run Code Online (Sandbox Code Playgroud)
当我这样做时read_pq,我得到
pyarrow.Table
_COL_0: decimal(9, 0)
_COL_1: decimal(9, 0)
_COL_2: decimal(9, 0)
_COL_3: decimal(9, 0)
Run Code Online (Sandbox Code Playgroud)
当我这样做时df = read_pd.to_pandas(); df.dtypes,我得到
_COL_0 object
_COL_1 object
_COL_2 object
_COL_3 object
dtype: object
Run Code Online (Sandbox Code Playgroud)
原始数据都是整数。当我对 pandas 数据帧中的对象进行操作时,操作非常缓慢。
pd.to_numeric或类似的方法?decimal(9, 0)?还是最好直接在熊猫数据帧上进行转换?
我试过:read_pq.column('_COL_0').cast('int32')抛出一个错误,如
No cast implemented from decimal(9, 0) to int32
Run Code Online (Sandbox Code Playgroud) 我一直在寻找在 Java 中进行转换的方法,arrow反之亦然parquet。
尽管 Python 库arrow完全支持上述转换,但我在 Java 中几乎找不到任何相同的文档。
arrow/parquet 有人在Java 库中遇到过这样的功能吗?
我使用以下方式将一个示例数据帧转换为.arrow文件pyarrow
import numpy as np
import pandas as pd
import pyarrow as pa
df = pd.DataFrame({"a": [10, 2, 3]})
df['a'] = pd.to_numeric(df['a'],errors='coerce')
table = pa.Table.from_pandas(df)
writer = pa.RecordBatchFileWriter('test.arrow', table.schema)
writer.write_table(table)
writer.close()
Run Code Online (Sandbox Code Playgroud)
这将创建一个文件 test.arrow
df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 1 columns):
a 3 non-null int64
dtypes: int64(1)
memory usage: 104.0 bytes
Run Code Online (Sandbox Code Playgroud)
然后在NodeJS中,使用arrowJS加载文件。 https://arrow.apache.org/docs/js/
const fs = require('fs');
const arrow = require('apache-arrow');
const data = fs.readFileSync('test.arrow');
const table = arrow.Table.from(data);
console.log(table.schema.fields.map(f => …Run Code Online (Sandbox Code Playgroud) 问题归结为以下几点:我想使用现有的并行输入集合和一个函数在 pyspark 中生成一个 DataFrame,该函数给定一个输入可以生成一批相对较大的行。在下面的示例中,我想使用 1000 个执行器生成 10^12 行数据帧:
def generate_data(one_integer):
import numpy as np
from pyspark.sql import Row
M = 10000000 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
row_type = Row("seed", "n", "x")
return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]
N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd) …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 C# Spark 中实现 Vector Udf。
我按照Spark .Net创建了 .Net Spark 环境。Vector Udf(Apache arrow 和 Microsoft.Data.Analysis 两者)都为我的 IntegerType 列工作。现在,尝试将 Integer 数组类型列发送到 Vector Udf 并找不到实现此目的的方法。
用途
using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.Sql;
using func = Microsoft.Spark.Sql.Functions;
using DataFrame = Microsoft.Spark.Sql.DataFrame;
using Arrow = Apache.Arrow;
Run Code Online (Sandbox Code Playgroud)
程序
SparkSession spark = SparkSession
.Builder()
.AppName("sample")
.GetOrCreate();
DataFrame dataFrame = spark.Range(0, 100).Repartition(4);
Func<Column, Column> array20 = func.Udf<int, int[]>(
(col1) => Enumerable.Range(0, col1).ToArray());
dataFrame = dataFrame.WithColumn("array", array20(dataFrame["id"]));
// Apache Arrow
var arrowVectorUdf = ArrowFunctions.VectorUdf<Arrow.UInt64Array, …Run Code Online (Sandbox Code Playgroud) c# user-defined-functions apache-spark apache-arrow .net-spark
我在 SparkRDataframe 上运行 gapply 函数,如下所示
df<-gapply(sp_Stack, function(key,e) {
Sys.setlocale('LC_COLLATE','C')
suppressPackageStartupMessages({
library(Rcpp)
library(Matrix)
library(reshape)
require(parallel)
require(lubridate)
library(plyr)
library(reticulate)
library(stringr)
library(data.table)
})
calcDecsOnly(e,RequestNumber=RequestNumber,
...)
},cols="udim",schema=schema3)
Run Code Online (Sandbox Code Playgroud)
如果我们设置 spark.sql.execution.arrow.sparkr.enabled = "false" 上面的代码运行没有任何错误,但如果我设置 spark.sql.execution.arrow.sparkr.enabled = "true" 火花作业失败低于错误
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.spark.sql.execution.r.ArrowRRunner$$anon$2.read(ArrowRRunner.scala:154)
Run Code Online (Sandbox Code Playgroud)
环境:Google Cloud Dataproc Spark 版本:3.1.1 Dataproc 版本:基于 2.0.9-debian10 构建的自定义映像
感谢这里的任何帮助,提前致谢
有人可以给我一个提示,告诉我如何使用PyArrow 的新文件系统接口(即 upload、copyFromLocal)将文件从本地文件系统复制到 HDFS 文件系统吗?
我已经反复阅读了文档,并尝试了一些方法(使用 copy_file() 和 FS URI),但似乎都不起作用。旧版 HDFS API的使用很简单,但它已被弃用,尽管新 API 似乎不完整。当然,在文件描述符之间移动数据块是一种解决方案,但是为什么copy_file()存在呢?
Spark 提供了几种不同的方法来实现使用和返回 Pandas DataFrame 的 UDF。我目前正在使用联合版本,该版本采用两个(联合分组)Pandas DataFrame 作为输入并返回第三个。
为了在 Spark DataFrame 和 Pandas DataFrame 之间进行高效转换,Spark 使用 Apache Arrow 内存布局,但是仍然需要在 Arrow 和 Pandas 之间进行转换。我真的很想直接访问 Arrow 数据,因为这就是我最终处理 UDF 中的数据的方式(使用Polars)。
来时从 Spark -> Arrow -> Pandas -> Arrow (Polars) 走,返回时相反,似乎很浪费。
user-defined-functions pandas apache-spark apache-arrow python-polars
apache-arrow ×10
apache-spark ×5
pyarrow ×5
parquet ×3
node.js ×2
pandas ×2
pyspark ×2
python ×2
.net-spark ×1
c# ×1
hdfs ×1
java ×1
javascript ×1
parquet-mr ×1
sparkr ×1