如何在PySpark中阅读Avro文件

B.M*_*.W. 13 python avro apache-spark pyspark

我正在使用python编写一个spark作业.但是,我需要阅读一大堆avro文件.

是我在Spark的示例文件夹中找到的最接近的解决方案.但是,您需要使用spark-submit提交此python脚本.在spark-submit的命令行中,您可以指定驱动程序类,在这种情况下,将定位您的所有avrokey,avrovalue类.

avro_rdd = sc.newAPIHadoopFile(
        path,
        "org.apache.avro.mapreduce.AvroKeyInputFormat",
        "org.apache.avro.mapred.AvroKey",
        "org.apache.hadoop.io.NullWritable",
        keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
        conf=conf)
Run Code Online (Sandbox Code Playgroud)

在我的情况下,我需要在Python脚本中运行所有内容,我已经尝试创建一个环境变量来包含jar文件,手指交叉Python会将jar添加到路径但显然它不是,它给了我意想不到的类错误.

os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮我如何在一个python脚本中读取avro文件?

zer*_*323 7

火花> = 2.4.0

您可以使用内置的Avro支持。该API向后兼容该spark-avro软件包,并增加了一些附加功能(最引人注目的是from_avro/ to_avro函数)。

请注意,模块未与标准Spark二进制文件捆绑在一起,必须使用spark.jars.packages或等效机制将其包含在内。

另请参阅Pyspark 2.4.0,使用读取流从kafka读取avro-Python

Spark <2.4.0

您可以使用spark-avro库。首先让我们创建一个示例数据集:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter

schema_string ='''{"namespace": "example.avro",
 "type": "record",
 "name": "KeyValue",
 "fields": [
     {"name": "key", "type": "string"},
     {"name": "value",  "type": ["int", "null"]}
 ]
}'''

schema = avro.schema.parse(schema_string)

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
    wrt.append({"key": "foo", "value": -1})
    wrt.append({"key": "bar", "value": 1})
Run Code Online (Sandbox Code Playgroud)

使用它读取spark-csv就像这样简单:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()

## +---+-----+
## |key|value|
## +---+-----+
## |foo|   -1|
## |bar|    1|
## +---+-----+ 
Run Code Online (Sandbox Code Playgroud)

  • 您能否提供一个带有“from_avro”的“pyspark”示例? (2认同)
  • 如果我错了,请纠正我,但看起来内置的“from_avro”和“to_avro”函数在 _PySpark_ 2.4.x 中尚不可用。看起来这些是在 PySpark 3.0 中添加的,根据“@since”标签[此处](https://github.com/apache/spark/blob/master/python/pyspark/sql/avro/functions.py# L29-L67)。 (2认同)

Rég*_* B. 5

前一种解决方案需要安装第三方Java依赖项,这不是大多数Python开发人员所满意的.但是如果你想要做的就是用给定的模式解析你的Avro文件,你真的不需要外部库.你可以只读取二进制文件并用你最喜欢的python Avro包解析它们.

例如,这是使用fastavro以下方法加载Avro文件的方法:

from io import BytesIO
import fastavro

schema = {
    ...
}

rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
    .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))

print(rdd.collect())
Run Code Online (Sandbox Code Playgroud)