Tha*_*gor 8 dataframe python-3.x apache-spark apache-spark-sql pyspark
我正在寻找一个谜.我b"I'm a byte string"在RDD中有一堆长文档可用作Python bytestrings().现在我将此RDD转换为a DataFrame以将其连接到另一个DataFrame.我这样做:
Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)
Data_schema = StructType([
StructField("URI", StringType(), True),
StructField("Content", StringType(), True),
])
Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)
print(Data_DF.show(5))
+--------------------+-----------+
| URI| Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
这些短"[B@10628e42"字符串对我来说似乎毫无用处,可能是某种指针.RDD中的字节串仍然是"完整的",因为我仍然可以访问它们.所以在从RDD转换到DataFrame问题时会发生.现在我尝试将字节串存储在其他类型的字段中,即ByteType()和BinaryType().两者都不起作用,因为这些错误消息不接受字节串:
TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>
Run Code Online (Sandbox Code Playgroud)
但它变得更加怪异.当我设置一个小规模的实验时:
ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))
DF2_schema = StructType([
StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)
print(DF_ByteStrings.show())
Run Code Online (Sandbox Code Playgroud)
也不允许使用小字节串,因为在StringType列中也是如此.当我尝试运行此时,我收到此错误消息:
StructType can not accept object b'one' in type <class 'bytes'>
Run Code Online (Sandbox Code Playgroud)
当我尝试让spark推断出一个类型时,它也会失败并显示以下消息:
TypeError: Can not infer schema for type: <class 'bytes'>
Run Code Online (Sandbox Code Playgroud)
所以任何想法如何我可以存储DataFrame没有.decode()他们的字节串.这是我在加入两者之后才能做的事情DataFrames,因为另一个持有解码信息.
我使用Python 3.5和Spark 2.0.1
提前致谢!
这不是一个谜.一步步:
bytes就是byte[]这相当于Array[Byte]在Scala中.StringType因此Array[Byte]将String在存储之前转换为DataFrame.Arrays在Scala中是丑陋的Java工件,除了其他问题之外没有任何有用的toString方法:
Array(192, 168, 1, 1).map(_.toByte)
Run Code Online (Sandbox Code Playgroud)
Array[Byte] = Array(-64, -88, 1, 1)
Run Code Online (Sandbox Code Playgroud)
Array(192, 168, 1, 1).map(_.toByte).toString
Run Code Online (Sandbox Code Playgroud)
String = [B@6c9fe061
Run Code Online (Sandbox Code Playgroud)
这是您获取列内容的方式.
Spark SQL中没有直接映射到Python的类型bytes.我个人会加入joinRDD,但如果你真的想用,DataFrames你可以使用中间BinaryType表示.
from collections import namedtuple
Record = namedtuple("Record", ["url", "content"])
rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()
df.printSchema()
Run Code Online (Sandbox Code Playgroud)
root
|-- url: string (nullable = true)
|-- content: binary (nullable = true)
Run Code Online (Sandbox Code Playgroud)
它不会为您提供本机使用(JVM)或有意义的字符串表示:
+-------+----------+
| url| content|
+-------+----------+
|none://|[66 6F 6F]|
|none://|[62 61 72]|
+-------+----------+
Run Code Online (Sandbox Code Playgroud)
但是无损:
df.rdd.map(lambda row: bytes(row.content)).first()
Run Code Online (Sandbox Code Playgroud)
b'foo'
Run Code Online (Sandbox Code Playgroud)
并且可以在Python中访问udf:
from pyspark.sql.functions import udf
from pyspark.sql import Column
from typing import Union
def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
if bs is not None:
return bytes(bs).decode(enc)
except UnicodeDecodeError:
pass
return udf(decode_)(col)
df.withColumn("decoded", decode("content")).show()
Run Code Online (Sandbox Code Playgroud)
+-------+----------+-------+
| url| content|decoded|
+-------+----------+-------+
|none://|[66 6F 6F]| foo|
|none://|[62 61 72]| bar|
+-------+----------+-------+
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3823 次 |
| 最近记录: |