我有一个顶级函数,它获取一个包含镶木地板文件路径和列名称的元组.
该函数只加载文件中的列,转换为pandas,而不是将其打包/序列化为标准格式.就像是:
import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool
def binarizer(file_data_tuple):
''' Read a Parquet column a file, binarize and return'''
path, col_name, col_meta, native = file_data_tuple
if not native:
# Either this or using a top level hdfs_con
hdfs_con = pa.hdfs.connect(params)
read_pq = pq.read_table if native else hdfs_con.read_parquet
arrow_col = read_pq(filepath, columns = (col_name,))
bin_col = imported_binarizng_function(arrow_col)
return bin_col
def read_binarize_parallel(filepaths):
''' Setup parallel reading and binarizing of a parquet file'''
# list of …Run Code Online (Sandbox Code Playgroud) 我有一个 Pandas 数据框,我试图将它作为镶木地板文件保存到 S3 中:
dftest = pd.DataFrame({'field': [1,2,3]})
dftest.to_parquet("s3://bucket_name/test.parquet", engine='pyarrow',
compression='gzip')
Run Code Online (Sandbox Code Playgroud)
我得到:“FileNotFoundError:bucket_name/test.parquet”
我开始在本地玩火花,发现这个奇怪的问题
1)点安装pyspark == 2.3.1
2)pyspark>
将熊猫作为pd导入
从pyspark.sql.functions导入pandas_udf,PandasUDFType,udf
df = pd.DataFrame({'x':[1,2,3],'y':[1.0,2.0,3.0]})
sp_df = spark.createDataFrame(df)
@pandas_udf('long',PandasUDFType.SCALAR)
def pandas_plus_one(v):
返回v + 1
sp_df.withColumn('v2',pandas_plus_one(sp_df.x))。show()
从这里以这个例子https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
知道为什么我会不断收到此错误吗?
py4j.protocol.Py4JJavaError:调用o108.showString时发生错误。
:org.apache.spark.SparkException:作业由于阶段失败而中止:阶段3.0中的任务0失败1次,最近一次失败:阶段3.0中的任务0.0(TID 8,本地主机,执行程序驱动程序)丢失:org.apache.spark .SparkException:Python worker意外退出(崩溃)
在org.apache.spark.api.python.BasePythonRunner $ ReaderIterator $$ anonfun $ 1.applyOrElse(PythonRunner.scala:333)
在org.apache.spark.api.python.BasePythonRunner $ ReaderIterator $$ anonfun $ 1.applyOrElse(PythonRunner.scala:322)中
在scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
在org.apache.spark.sql.execution.python.ArrowPythonRunner $$ anon $ 1.read(ArrowPythonRunner.scala:177)
在org.apache.spark.sql.execution.python.ArrowPythonRunner $$ anon $ 1.read(ArrowPythonRunner.scala:121)
在org.apache.spark.api.python.BasePythonRunner $ ReaderIterator.hasNext(PythonRunner.scala:252)
在org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
在org.apache.spark.sql.execution.python.ArrowEvalPythonExec $$ anon $ 2。(ArrowEvalPythonExec.scala:90)
在org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:88)
在org.apache.spark.sql.execution.python.EvalPythonExec $$ anonfun $ doExecute $ 1.apply(EvalPythonExec.scala:131)
在org.apache.spark.sql.execution.python.EvalPythonExec $$ anonfun $ doExecute $ 1.apply(EvalPythonExec.scala:93)
在org.apache.spark.rdd.RDD $$ anonfun … 我正在尝试使用 Dask 从 google 存储桶读取和写入。使用一堆csv文件可以,但不方便(速度较慢,无法压缩,无法仅读取某些列),所以我尝试使用该apache parquet格式。
看起来写得很好:
import dask.dataframe as dd
pandas_df = pd.DataFrame({'x' : [2,3, 2], 'y': [1, 0, 0]})
dask_df = dd.from_pandas(pandas_df, npartitions=2)
dask_df.to_parquet("gcs://my_google_bucket/test/")
Run Code Online (Sandbox Code Playgroud)
但当我尝试读回来时
read_again_df = dd.read_parquet("gcs://my_google_bucket/test/")
Run Code Online (Sandbox Code Playgroud)
我收到一个未实现的错误:
AttributeError Traceback (most recent call last)
~/miniconda3/envs/env1/lib/python3.6/site-packages/dask/bytes/core.py in get_pyarrow_filesystem(fs)
520 try:
--> 521 return fs._get_pyarrow_filesystem()
522 except AttributeError:
AttributeError: 'DaskGCSFileSystem' object has no attribute '_get_pyarrow_filesystem'
During handling of the above exception, another exception occurred:
NotImplementedError Traceback (most recent call last)
<ipython-input-42-ef1fc41d04d5> in <module>()
----> 1 …Run Code Online (Sandbox Code Playgroud) 我尝试阅读文档,但我仍然很困惑。pyarrow 表能够执行 groupbys 和所有精彩的 pandas 功能吗?
import pyarrow as pa
import pandas as pd
df = pd.DataFrame({"a": [1, 2, 3]})
table = pa.Table.from_pandas(df)
Run Code Online (Sandbox Code Playgroud)
但现在是虎头蛇尾:
table["a"]
# ---------------------------------------------------------------------------
# TypeError Traceback (most recent call last)
# <ipython-input-18-fb884245e2de> in <module>()
# ----> 1 table["a"]
# table.pxi in pyarrow.lib.Table.__getitem__()
# TypeError: an integer is required
table[0]
# <pyarrow.lib.Column object at 0x111306330>
# chunk 0: <pyarrow.lib.Int64Array object at 0x11728d1d8>
# [
# 1,
# 2,
# 3
# ]
c = table[0]
c[c>2]
# …Run Code Online (Sandbox Code Playgroud) 我正在阅读一个 CSV 文件,pandas.read_csv它会自动检测架构,就像
Column1: string
Column2: string
Column3: string
Column4: int64
Column5: double
Column6: double
__index_level_0__: int64
Run Code Online (Sandbox Code Playgroud)
然后,我试图将它pyarrow.parquet.write_table 写成 Parquet 表。但是,我想对新的镶木地板文件使用以下架构
Column1: string
Column2: string
Column3: string
Column4: string
Column5: string
Column6: string
__index_level_0__: int64
Run Code Online (Sandbox Code Playgroud)
但是我收到一条错误消息,说“表架构与用于创建文件的架构不匹配”。这是我用来将 CSV 文件转换为从这里借来的 Parquet 文件的一段代码
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = 'C:/input.csv'
parquet_file = 'C:/putput.parquet'
chunksize = 100_000
csv_stream = pd.read_csv(csv_file, sep=',', chunksize=chunksize, low_memory=False, encoding="ISO-8859-1")
for i, chunk in enumerate(csv_stream):
print("Chunk", i) …Run Code Online (Sandbox Code Playgroud) 有几种方法可以将熊猫转换为镶木地板。例如 pyarrow.Table.from_pandas 或 dataframe.to_parquet 。它们的共同点是,它们将 df.parquet 应存储的文件路径作为参数获取。
我需要将写入的镶木地板文件的内容放入一个变量中,但还没有看到这一点。主要是我想要与 pandas.to_csv 相同的行为,如果没有提供路径,它将结果作为字符串返回。
当然,我可以只写文件并使用标准的文件读取操作从 python 中读取到字符串中。由于我正在写入大量数据,这会在文件系统上产生大量负载......
我正在尝试遵循在本地进行构建pyarrow的文档。具体来说,使用conda说明:
conda create -y -n pyarrow-dev -c conda-forge \
--file arrow/ci/conda_env_unix.yml \
--file arrow/ci/conda_env_cpp.yml \
--file arrow/ci/conda_env_python.yml \
compilers \
python=3.7
conda activate pyarrow-dev
export ARROW_HOME=$CONDA_PREFIX
git clone https://github.com/apache/arrow.git
mkdir arrow/cpp/build
pushd arrow/cpp/build
cmake -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
-DCMAKE_INSTALL_LIBDIR=lib \
-DARROW_FLIGHT=ON \
-DARROW_GANDIVA=ON \
-DARROW_ORC=ON \
-DARROW_PARQUET=ON \
-DARROW_PYTHON=ON \
-DARROW_PLASMA=ON \
-DARROW_BUILD_TESTS=ON \
..
Run Code Online (Sandbox Code Playgroud)
该cmake指令失败,并显示以下错误消息:
-- Building using CMake version: 3.12.3
-- The C compiler identification is Clang 4.0.1
-- The CXX …Run Code Online (Sandbox Code Playgroud) 我试图做这样的事情这样,阅读从S3存储文件的列表到pyarrow表。
如果我指定文件名,我可以这样做:
from pyarrow.parquet import ParquetDataset
import s3fs
dataset = ParquetDataset(
"s3://path/to/file/myfile.snappy.parquet,
filesystem=s3fs.S3FileSystem(),
)
Run Code Online (Sandbox Code Playgroud)
一切都按预期进行。但是,如果我这样做:
dataset = ParquetDataset(
"s3://path/to/file,
filesystem=s3fs.S3FileSystem(),
)
Run Code Online (Sandbox Code Playgroud)
我得到:
pyarrow/_parquet.pyx:1036: in pyarrow._parquet.ParquetReader.open
pyarrow.lib.ArrowIOError: Invalid Parquet file size is 0 bytes
Run Code Online (Sandbox Code Playgroud) 我有一个形状为 6132,7 的 pyarrow 表名称 final_table 我想向该表添加列
list_ = ['IT'] * 6132
final_table.append_column('COUNTRY_ID', list_)
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误 ArrowInvalid:添加的列的长度必须与表的长度匹配。预期长度 6132 但得到长度 12264