使用Pyarrows的HdfsClient进行多处理

Jay*_*Jay 2 python multiprocessing parquet pyarrow

我有一个顶级函数,它获取一个包含镶木地板文件路径和列名称的元组.

该函数只加载文件中的列,转换为pandas,而不是将其打包/序列化为标准格式.就像是:

import pyarrow as pa
import pyarrow.parquet as pq
from multiprocessing import Pool

def binarizer(file_data_tuple):
   ''' Read a Parquet column a file, binarize and return'''

   path, col_name, col_meta, native = file_data_tuple
   if not native: 
       # Either this or using a top level hdfs_con
       hdfs_con = pa.hdfs.connect(params)     
   read_pq = pq.read_table if native else hdfs_con.read_parquet

   arrow_col = read_pq(filepath, columns = (col_name,))
   bin_col = imported_binarizng_function(arrow_col)
   return bin_col

def read_binarize_parallel(filepaths):
    ''' Setup parallel reading and binarizing of a parquet file'''

    # list of tuples containing the filepath, column name, meta, and mode   
    pool_params = [(),..] 
    pool = Pool()
    for file in filepaths:
        bin_cols = pool.map(binarizer, pool_params)
        chunk =  b''.join(bin_cols)
        send_over_socket(chunk)
Run Code Online (Sandbox Code Playgroud)

这在我使用本机模式时工作,即从本地文件系统读取文件.

但是,如果我尝试阅读hdfs,那么当我在每个进程中打开一个连接以及当我尝试使用相同的连接时,我都会感到奇怪(对我而言)Arrow错误.这是错误的压缩版本:

[libprotobuf错误google/protobuf/message_lite.cc:123]无法解析"Hdfs.Internal.RpcResponseHeaderProto"类型的消息,因为它缺少必填字段:callId,status [libprotobuf ERROR google/protobuf/message_lite.cc:123] CALLID,状态[libprotobuf ERROR谷歌/ protobuf的/ message_lite.cc::123],因为它缺少必需字段不能分析型"Hdfs.Internal.RpcResponseHeaderProto"的消息无法分析类型的消息"Hdfs.Internal.RpcResponseHeaderProto ",因为它缺少必需的字段:CALLID,状态[libprotobuf ERROR谷歌/ protobuf的/ message_lite.cc:123]不能解析类型的消息'Hdfs.Internal.RpcResponseHeaderProto’,因为它缺少必需的字段:CALLID,状态2018- 01-09 21:41:47.939006,p10007,th139965275871040,ERROR未能调用RPC调用 "getFileInfo" 上服务器 "192.168.0.101:9000":RpcChannel.cpp:703:HdfsRpcException:RPC信道以 "192.168.0.101:9000"协议不匹配:RPC通道无法找到待处理的呼叫:id = 3. @ Unknown

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_15isfile(_object*,_ object*)@ Unknown @ Unknown

@   Unknown
Run Code Online (Sandbox Code Playgroud)

2018-01-09 21:41:47.939103,p10007,th139965275871040,INFO在服务器上重试幂等RPC调用"getFileInfo""192.168.0.101:9000"2018-01-09 21:41:47.939357,p10010,th139965275871040,错误无法在服务器"192.168.0.101:9000"上调用RPC调用"getFileInfo":RpcChannel.cpp:780:HdfsRpcException:RPC通道为"192.168.0.101:9000"得到协议不匹配:RPC通道无法解析响应头.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown @ Unknown

@   Unknown
@2018-01-09 21:41:47.939406, p10008, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
Run Code Online (Sandbox Code Playgroud)

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpcException:到"192.168.0.101:9000"的RPC通道出现协议不匹配:RPC通道无法解析响应头.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown

@   Unknown 2018-01-09 21:41:47.939422, p10013, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
Run Code Online (Sandbox Code Playgroud)

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpcException:到"192.168.0.101:9000"的RPC通道出现协议不匹配:RPC通道无法解析响应头.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown

@   Unknown
@2018-01-09 21:41:47.939431, p10009, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
Run Code Online (Sandbox Code Playgroud)

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpcException:到"192.168.0.101:9000"的RPC通道出现协议不匹配:RPC通道无法解析响应头.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown

@   Unknown
@   @   Unknown
Unknown 2018-01-09 21:41:47.939457, p10012, th139965275871040, ERROR Failed to invoke RPC call "getFileInfo" on server
Run Code Online (Sandbox Code Playgroud)

"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpcException:到"192.168.0.101:9000"的RPC通道出现协议不匹配:RPC通道无法解析响应头.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown @ Unknown

@   Unknown
@   Unknown
Unknown
@   Unknown binarizing process filepath: /parquet_430mb/5e6.parquet
@   Unknown
Unknown
@   Unknown

@   Unknown


@   Unknown
Run Code Online (Sandbox Code Playgroud)

2018-01-09 21:41:47.939854,p10010,th139965275871040,INFO在服务器"192.168.0.101:9000"上重试幂等RPC调用"getFileInfo"

2018-01-09 21:41:47.939864,p10013,th139965275871040,INFO在服务器上重试幂等RPC调用"getFileInfo""192.168.0.101:9000"2018-01-09 21:41:47.939866,p10008,th139965275871040,INFO重试幂等服务器上的RPC调用"getFileInfo""192.168.0.101:9000"2018-01-09 21:41:47.939868,p10012,th139965275871040,INFO在服务器"192.168.0.101:9000"上重试幂等RPC调用"getFileInfo"2018-01- 09 21:41:47.939868,p10009,th139965275871040,INFO重试幂等RPC调用在服务器上 "getFileInfo" "192.168.0.101:9000" 2018年1月9日21:41:47.940813,p10014,th139965275871040,ERROR未能调用RPC调用" getFileInfo"在服务器上"192.168.0.101:9000":RpcChannel.cpp:780:HdfsRpcException:RPC通道到"192.168.0.101:9000"得到协议不匹配:RPC通道无法解析响应头.@未知

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown

@   Unknown
Run Code Online (Sandbox Code Playgroud)

2018-01-09 21:41:47.940937,p10014,th139965275871040,INFO重试服务器上的幂等RPC调用"getFileInfo""192.168.0.101:9000"2018-01-09 21:41:47.944352,p10011,th139965275871040,错误无法在服务器"192.168.0.101:9000"上调用RPC调用"getFileInfo":RpcChannel.cpp:393:HdfsRpcException:无法在服务器"192.168.0.101:9000"上调用RPC调用"getFileInfo"@ Unknown @ Unknown

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown

@   Unknown Caused by TcpSocket.cpp: 127: HdfsNetworkException: Write 124 bytes failed to "192.168.0.101:9000": (errno: 32) Broken
Run Code Online (Sandbox Code Playgroud)

pipe @ Unknown @ Unknown

@   Unknown
@   arrow::io::HadoopFileSystem::GetPathInfo(std::string const&, arrow::io::HdfsPathInfo*)
@   __pyx_f_7pyarrow_3lib_16HadoopFileSystem__path_info(__pyx_obj_7pyarrow_3lib_HadoopFileSystem*,
Run Code Online (Sandbox Code Playgroud)

_object*,arrow :: io :: HdfsPathInfo*)@ __ pyx_pw_7pyarrow_3lib_16HadoopFileSystem_13isdir(_object*,_ object*)@ Unknown @ Unknown

@   Unknown
Run Code Online (Sandbox Code Playgroud)

2018-01-09 21:41:47.944519,p10011,th139965275871040,INFO在服务器"192.168.0.101:9000"上重试幂等RPC调用"getFileInfo"------------------ -------------------------------------------------- ------- ArrowIOError Traceback(最近一次调用最后一次)

/home/parquet_sender.pyc在insert_files_parallel(个体)374#打印( '372 sqparquet文件路径:',文件路径)375 params_with_path_and_mode = [col_params +(文件路径,天然)用于pool_params col_params] - > 376 bin_col = self.pool.map (read_binarize,params_with_path_and_mode)377 get('map complete')378 num_rows = bin_col [0] [2]

map中的/usr/lib/python2.7/multiprocessing/pool.pyc(self,func,iterable,chunksize)249'''250断言self._state == RUN - > 251返回self.map_async(func,iterable, chunksize).get()252 253 def imap(self,func,iterable,chunksize = 1):

/usr/lib/python2.7/multiprocessing/pool.pyc in get(self,timeout)556 return self._value 557 else: - > 558 raise self._value 559 560 def _set(self,i,obj):

ArrowIOError:HDFS:GetPathInfo失败

我很乐意对这个错误的原因有任何反馈,以及我应该如何使用平行镶木地板加载.

Wes*_*ney 14

这是与多处理序列化详细信息相关的错误.我在这里打开了一个错误报告https://issues.apache.org/jira/browse/ARROW-1986