如何在Spark Dataframe中存储Python字节串

Tha*_*gor 8 dataframe python-3.x apache-spark apache-spark-sql pyspark

我正在寻找一个谜.我b"I'm a byte string"在RDD中有一堆长文档可用作Python bytestrings().现在我将此RDD转换为a DataFrame以将其连接到另一个DataFrame.我这样做:

Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)

Data_schema = StructType([
    StructField("URI", StringType(), True),
    StructField("Content", StringType(), True),
])

Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)

print(Data_DF.show(5))

+--------------------+-----------+
|                 URI|    Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

这些短"[B@10628e42"字符串对我来说似乎毫无用处,可能是某种指针.RDD中的字节串仍然是"完整的",因为我仍然可以访问它们.所以在从RDD转换到DataFrame问题时会发生.现在我尝试将字节串存储在其他类型的字段中,即ByteType()BinaryType().两者都不起作用,因为这些错误消息不接受字节串​​:

TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>
Run Code Online (Sandbox Code Playgroud)

但它变得更加怪异.当我设置一个小规模的实验时:

ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))

DF2_schema = StructType([
    StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)

print(DF_ByteStrings.show())
Run Code Online (Sandbox Code Playgroud)

也不允许使用小字节串,因为在StringType列中也是如此.当我尝试运行此时,我收到此错误消息:

StructType can not accept object b'one' in type <class 'bytes'>
Run Code Online (Sandbox Code Playgroud)

当我尝试让spark推断出一个类型时,它也会失败并显示以下消息:

TypeError: Can not infer schema for type: <class 'bytes'>
Run Code Online (Sandbox Code Playgroud)

所以任何想法如何我可以存储DataFrame没有.decode()他们的字节串.这是我在加入两者之后才能做的事情DataFrames,因为另一个持有解码信息.

我使用Python 3.5和Spark 2.0.1

提前致谢!

use*_*411 8

这不是一个谜.一步步:

Spark SQL中没有直接映射到Python的类型bytes.我个人会加入joinRDD,但如果你真的想用,DataFrames你可以使用中间BinaryType表示.

from collections import namedtuple

Record = namedtuple("Record", ["url", "content"])

rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()

df.printSchema()
Run Code Online (Sandbox Code Playgroud)
root
 |-- url: string (nullable = true)
 |-- content: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)

它不会为您提供本机使用(JVM)或有意义的字符串表示:

+-------+----------+
|    url|   content|
+-------+----------+
|none://|[66 6F 6F]|
|none://|[62 61 72]|
+-------+----------+
Run Code Online (Sandbox Code Playgroud)

但是无损:

df.rdd.map(lambda row: bytes(row.content)).first()
Run Code Online (Sandbox Code Playgroud)
b'foo'
Run Code Online (Sandbox Code Playgroud)

并且可以在Python中访问udf:

from pyspark.sql.functions import udf
from pyspark.sql import Column
from typing import Union

def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
    def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
        if bs is not None:
            return bytes(bs).decode(enc)
        except UnicodeDecodeError:
            pass 
    return udf(decode_)(col)

df.withColumn("decoded", decode("content")).show()
Run Code Online (Sandbox Code Playgroud)
+-------+----------+-------+
|    url|   content|decoded|
+-------+----------+-------+
|none://|[66 6F 6F]|    foo|
|none://|[62 61 72]|    bar|
+-------+----------+-------+
Run Code Online (Sandbox Code Playgroud)