Wil*_*ebb 6 python hadoop numpy apache-spark spark-dataframe
我正在尝试将一些代码从 Pandas 移植到 (py)Spark。不幸的是,我已经在输入部分失败了,我想在其中读取二进制数据并将其放入 Spark Dataframe。
到目前为止,我正在使用fromfilenumpy:
dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:] #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)
Run Code Online (Sandbox Code Playgroud)
但是对于 Spark,我找不到如何去做。到目前为止,我的解决方法是使用 csv-Files 而不是二进制文件,但这不是理想的解决方案。我知道我不应该将 numpyfromfile与 spark 一起使用。如何读取已加载到 hdfs 中的二进制文件?
我试过类似的东西
fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)
Run Code Online (Sandbox Code Playgroud)
但它给了我一个No such file or directory错误。
我见过这个问题: spark in python: create an rdd by loading binary data with numpy.fromfile 但这只适用于我将文件存储在驱动程序节点的家中。
所以,对于像我这样从 Spark 开始并偶然发现二进制文件的人。这是我解决它的方法:
dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
StructField('idx_resource',IntegerType(),False),
StructField('date',IntegerType),False),
StructField('value',DoubleType(),False),
StructField('pollID',IntegerType(),False)])
filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')
def read_array(rdd):
#output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
array=array.newbyteorder().byteswap() # big Endian
return array.tolist()
unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)
Run Code Online (Sandbox Code Playgroud)
现在你可以用你的数据框在 Spark 中做任何你想做的事情。