读取spark中的bytes列

pau*_*ult 6 encoding apache-spark apache-spark-sql pyspark

我有一个数据集,其中包含一个未知(非友好)编码的ID字段.我可以使用普通的python读取单列,并验证这些值在多个数据集中是不同的和一致的(即它可以用作连接的主键).

使用时加载文件spark.read.csv,似乎spark正在将列转换为utf-8.但是,某些多字节序列将转换为Unicode字符U+FFFD REPLACEMENT CHARACTER.(EF BF BD十六进制).

有没有办法强制Spark以字节而不是字符串的形式读取列?

以下是一些可用于重新创建问题的代码(让列为aID字段):

使用示例数据创建文件

data = [
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb0'), '1', 'a'),
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb1'), '2', 'b'),
    (bytes(b'\xba\xed\x85\x8e\x91\xd4\xc7\xb2'), '3', 'c')
]

with open('sample.csv', 'wb') as f:
    header = ["a", "b", "c"]
    f.write(",".join(header)+"\n")
    for d in data:
        f.write(",".join(d) + "\n")
Run Code Online (Sandbox Code Playgroud)

使用熊猫阅读

import pandas as pd
df = pd.read_csv("sample.csv", converters={"a": lambda x: x.encode('hex')})
print(df)
#                  a  b  c
#0  baed858e91d4c7b0  1  a
#1  baed858e91d4c7b1  2  b
#2  baed858e91d4c7b2  3  c
Run Code Online (Sandbox Code Playgroud)

尝试使用Spark读取相同的文件

spark_df = spark.read.csv("sample.csv", header=True)
spark_df.show()
#+-----+---+---+
#|a    |b  |c  |
#+-----+---+---+
#|?????|1  |a  |
#|?????|2  |b  |
#|?????|3  |c  |
#+-----+---+---+
Run Code Online (Sandbox Code Playgroud)

哎呀!好的,转换到怎么样hex

import pyspark.sql.functions as f
spark_df.withColumn("a", f.hex("a")).show(truncate=False)
#+----------------------------+---+---+
#|a                           |b  |c  |
#+----------------------------+---+---+
#|EFBFBDED858EEFBFBDEFBFBDC7B0|1  |a  |
#|EFBFBDED858EEFBFBDEFBFBDC7B1|2  |b  |
#|EFBFBDED858EEFBFBDEFBFBDC7B2|3  |c  |
#+----------------------------+---+---+
Run Code Online (Sandbox Code Playgroud)

(在此示例中,值是不同的,但在我的较大文件中不是这样)

如您所见,值很接近,但有些字节已被替换EFBFBD

有没有办法在Spark中读取文件(也许使用rdd?),以便我的输出看起来像pandas版本:

#+----------------+---+---+
#|a               |b  |c  |
#+----------------+---+---+
#|baed858e91d4c7b0|1  |a  |
#|baed858e91d4c7b1|2  |b  |
#|baed858e91d4c7b2|3  |c  |
#+----------------+---+---+
Run Code Online (Sandbox Code Playgroud)

我已经尝试过转换byte并指定架构,以便此列ByteType()不起作用.

编辑

我正在使用Spark v 2.1.

Sim*_*Sim 1

问题的根源在于分隔文件不太适合二进制数据。

如果文本有已知的一致编码,请使用该charset选项。请参阅https://github.com/databricks/spark-csv#features(我不知道 2.x 文档中描述分隔阅读选项的好地方,所以我仍然回到 1.x 文档) 。我建议尝试使用 8 位 ASCII,例如ISO-8859-1US-ASCII

如果没有这样的编码,您需要将输入转换为不同的格式,例如,对第一列进行 base64 编码,或者操作读取的数据以将其恢复为您需要的格式。