如何将二进制文件从 hdfs 读入 Spark 数据帧?

Wil*_*ebb 6 python hadoop numpy apache-spark spark-dataframe

我正在尝试将一些代码从 Pandas 移植到 (py)Spark。不幸的是,我已经在输入部分失败了,我想在其中读取二进制数据并将其放入 Spark Dataframe。

到目前为止,我正在使用fromfilenumpy:

dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:]                                           #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)
Run Code Online (Sandbox Code Playgroud)

但是对于 Spark,我找不到如何去做。到目前为止,我的解决方法是使用 csv-Files 而不是二进制文件,但这不是理想的解决方案。我知道我不应该将 numpyfromfile与 spark 一起使用。如何读取已加载到 hdfs 中的二进制文件?

我试过类似的东西

fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)
Run Code Online (Sandbox Code Playgroud)

但它给了我一个No such file or directory错误。

我见过这个问题: spark in python: create an rdd by loading binary data with numpy.fromfile 但这只适用于我将文件存储在驱动程序节点的家中。

Wil*_*ebb 5

所以,对于像我这样从 Spark 开始并偶然发现二进制文件的人。这是我解决它的方法:

dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
             ('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
                   StructField('idx_resource',IntegerType(),False), 
                   StructField('date',IntegerType),False), 
                   StructField('value',DoubleType(),False), 
                   StructField('pollID',IntegerType(),False)])

filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')

def read_array(rdd):
    #output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
    array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
    array=array.newbyteorder().byteswap() # big Endian
    return array.tolist()

unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)
Run Code Online (Sandbox Code Playgroud)

现在你可以用你的数据框在 Spark 中做任何你想做的事情。