我有一个包含数百万条记录的 SQL 表,我计划使用 pyarrow 库将其写入文件夹中的许多 parquet 文件。数据内容似乎太大,无法存储在单个 parquet 文件中。
但是,我似乎找不到 pyarrow 库的 API 或参数来允许我指定如下内容:
file_scheme="hive"
Run Code Online (Sandbox Code Playgroud)
由 fastparquet python 库支持。
这是我的示例代码:
#!/usr/bin/python
import pyodbc
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
conn_str = 'UID=username;PWD=passwordHere;' +
'DRIVER=FreeTDS;SERVERNAME=myConfig;DATABASE=myDB'
#----> Query the SQL database into a Pandas dataframe
conn = pyodbc.connect( conn_str, autocommit=False)
sql = "SELECT * FROM ClientAccount (NOLOCK)"
df = pd.io.sql.read_sql(sql, conn)
#----> Convert the dataframe to a pyarrow table and write it out
table = pa.Table.from_pandas(df) …Run Code Online (Sandbox Code Playgroud) 使用案例:
我正在使用 Google BigTable 来存储这样的计数:
| rowkey | columnfamily |
| | col1 | col2 | col3 |
|----------|------|------|------|
| row1 | 1 | 2 | 3 |
| row2 | 2 | 4 | 8 |
| row3 | 3 | 3 | 3 |
Run Code Online (Sandbox Code Playgroud)
我想读取给定范围的行键的所有行(在本例中假设所有行)并聚合每列的值。
一个简单的实现将查询行并在聚合计数时迭代行,如下所示:
from google.cloud.bigtable import Client
instance = Client(project='project').instance('my-instance')
table = instance.table('mytable')
col1_sum = 0
col2_sum = 0
col3_max = 0
table.read_rows()
row_data.consume_all()
for row in row_data.rows:
col1_sum += int.from_bytes(row['columnfamily']['col1'.encode('utf-8')][0].value(), byteorder='big')
col2_sum += int.from_bytes(row['columnfamily']['col2'.encode('utf-8')][0].value(), …Run Code Online (Sandbox Code Playgroud) 是否可以在 s3 中将 Parquet 文件从一个文件夹读取和写入另一个文件夹,而无需使用 pyarrow.
这是我的代码:
import pyarrow.parquet as pq
import pyarrow as pa
import s3fs
s3 = s3fs.S3FileSystem()
bucket = 'demo-s3'
pd = pq.ParquetDataset('s3://{0}/old'.format(bucket), filesystem=s3).read(nthreads=4).to_pandas()
table = pa.Table.from_pandas(pd)
pq.write_to_dataset(table, 's3://{0}/new'.format(bucket), filesystem=s3, use_dictionary=True, compression='snappy')
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Pandas to_parquet 保存一个非常大的数据集,当超过某个限制时,它似乎失败了,无论是“pyarrow”还是“fastparquet”。我使用以下代码重现了我遇到的错误,并且很高兴听到有关如何克服该问题的想法:
使用 Pyarrow:
low = 3
high = 8
for n in np.logspace(low, high, high-low+1):
t0 = time()
df = pd.DataFrame.from_records([(f'ind_{x}', ''.join(['x']*50)) for x in range(int(n))], columns=['a', 'b']).set_index('a')
df.to_parquet(tmp_file, engine='pyarrow', compression='gzip')
pd.read_parquet(tmp_file, engine='pyarrow')
print(f'10^{np.log10(int(n))} read-write took {time()-t0} seconds')
10^3.0 read-write took 0.012851715087890625 seconds
10^4.0 read-write took 0.05722832679748535 seconds
10^5.0 read-write took 0.46846866607666016 seconds
10^6.0 read-write took 4.4494054317474365 seconds
10^7.0 read-write took 43.0602171421051 seconds
---------------------------------------------------------------------------
ArrowIOError Traceback (most recent call last)
<ipython-input-51-cad917a26b91> in <module>()
5 df = pd.DataFrame.from_records([(f'ind_{x}', …Run Code Online (Sandbox Code Playgroud) 我想使用 Apache Arrow 将数据从 Django 后端发送到 Angular 前端。我想使用数据帧/表的字典作为消息中的有效负载。pyarrow 可以在 python 微服务之间以这种方式共享数据,但我找不到箭头的 javascript 实现的方法。
有没有办法反序列化/序列化一个字典,字符串作为键,数据帧/表作为箭头在javascript端的值?
我正在尝试在我的服务器上运行一个简单的 Pandas UDF 示例。从这里
我创建了一个新的环境只是为了运行这个代码。
(PySparkEnv) $ conda list
# packages in environment at /home/shekhar/.conda/envs/PySparkEnv:
#
# Name Version Build Channel
arrow-cpp 0.10.0 py36h70250a7_0 conda-forge
blas 1.0 mkl
boost-cpp 1.67.0 h3a22d5f_0 conda-forge
bzip2 1.0.6 h470a237_2 conda-forge
ca-certificates 2018.8.24 ha4d7672_0 conda-forge
certifi 2018.8.24 py36_1 conda-forge
icu 58.2 hfc679d8_0 conda-forge
intel-openmp 2019.0 117
libffi 3.2.1 hfc679d8_5 conda-forge
libgcc-ng 7.2.0 hdf63c60_3 conda-forge
libgfortran-ng 7.2.0 hdf63c60_3 conda-forge
libstdcxx-ng 7.2.0 hdf63c60_3 conda-forge
mkl 2019.0 117
mkl_fft 1.0.6 py36_0 conda-forge
mkl_random 1.0.1 py36_0 conda-forge
ncurses …Run Code Online (Sandbox Code Playgroud) 我有一些大型 CSV 文件,我最终想将其转换为镶木地板。由于内存限制和处理 NULL 值的困难(这在我的数据中很常见),Pandas 不会提供帮助。我检查了 PyArrow 文档,有用于读取 parquet 文件的工具,但我没有看到任何有关读取 CSV 的信息。我是否错过了什么,或者这个功能是否与 PyArrow 不兼容?
我在不是很大的 DataFrame 上使用 toPandas() ,但出现以下异常:
18/10/31 19:13:19 ERROR Executor: Exception in task 127.2 in stage 13.0 (TID 2264)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 230, in main
process()
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/worker.py", line 225, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 261, in dump_stream
batch = _create_batch(series, self._timezone)
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/home/hadoop/spark2.3.1/python/lib/pyspark.zip/pyspark/serializers.py", line 239, in <listcomp>
arrs = [create_array(s, t) for s, t in series] …Run Code Online (Sandbox Code Playgroud) 我知道有 pyarrow.parquet 用于将镶木地板文件读取为箭头表,但我正在寻找 avro 的等效项?
我有一个大约 10+GB 的镶木地板文件,列主要是字符串。加载到内存时,内存占用最高可达110G,加载完毕后,内存占用又回落至40G左右。
我正在使用分配了内存的高性能计算机,因此我确实可以访问大内存。不过,仅仅为了加载数据就必须申请128G内存,这对我来说似乎有点浪费,之后64G就足够了。另外,128G内存更容易出现故障。
我天真的猜想是,Python解释器将HPC上的512G物理内存误认为是总可用内存,因此它没有按照实际需要的频率进行垃圾收集。例如,当我用64G内存加载数据时,它从来没有抛出MemoryError,而是直接杀死内核并重新启动。
我想知道加载时内存使用率过高是否是 pyarrow 的正常行为,或者是由于我的环境的特殊设置所致。如果是后者,那么是否可以在加载过程中以某种方式限制可用内存?
pyarrow ×10
python ×6
apache-arrow ×3
parquet ×3
pandas ×2
pyspark ×2
apache-spark ×1
bigtable ×1
fastparquet ×1
hadoop ×1
ipc ×1
javascript ×1
python-3.x ×1