我正试图从读取csv平面文件切换到spark上的avro文件.关注https://github.com/databricks/spark-avro 我使用:
import com.databricks.spark.avro._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.avro("gs://logs.xyz.com/raw/2016/04/20/div1/div2/2016-04-20-08-28-35.UTC.blah-blah.avro")
Run Code Online (Sandbox Code Playgroud)
得到
java.lang.UnsupportedOperationException: This mix of union types is not supported (see README): ArrayBuffer(STRING)
Run Code Online (Sandbox Code Playgroud)
自述文件清楚地说明:
此库支持读取所有Avro类型,但复杂的联合类型除外.它使用以下从Avro类型到Spark SQL类型的映射:
当我尝试文本阅读相同的文件时,我可以看到架构
val df = sc.textFile("gs://logs.xyz.com/raw/2016/04/20/div1/div2/2016-04-20-08-28-35.UTC.blah-blah.avro")
df.take(2).foreach(println)
Run Code Online (Sandbox Code Playgroud)
{ "名称": "log_record", "类型": "记录", "字段":[{ "名称": "请求", "类型":{ "类型": "记录", "名称":"request_data " "字段":[{ "名称": "日期时间", "类型": "串"},{ "名称": "IP", "类型": "串"},{ "名称":" 主机" "类型": "串"},{ "名称": "URI", "类型": "串"},{ "名称": "REQUEST_URI", "类型": "串"},{" 名":" 引用者", "类型": "串"},{ "名称": "用户代理", "类型": "串"}]}} …
我正在尝试使用《Apache Avro数据源指南》中spark-avro所述的软件包。
当我提交以下命令时:
val df = spark.read.format("avro").load("~/foo.avro")
Run Code Online (Sandbox Code Playgroud)
我收到一个错误:
java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
... 49 elided
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
at org.apache.spark.sql.avro.AvroFileFormat.<init>(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 62 more
Run Code Online (Sandbox Code Playgroud)
我已经尝试过该org.apache.spark:spark-avro_2.12:2.4.0软件包的不同版本(2.4.0、2.4.1和2.4.2),并且我目前使用的是Spark 2.4.1,但都没有用。 …
我有一个用例,我想将结构字段转换为 Avro 记录。struct 字段最初映射到 Avro 类型。输入数据是avro文件,struct字段对应输入avro记录中的一个字段。
以下是我想用伪代码实现的目标。
DataSet<Row> data = loadInput(); // data is of form (foo, bar, myStruct) from avro data.
// do some joins to add more data
data = doJoins(data); // now data is of form (a, b, myStruct)
// transform DataSet<Row> to DataSet<MyType>
DataSet<MyType> myData = data.map(row -> myUDF(row), encoderOfMyType);
// method `myUDF` definition
MyType myUDF(Row row) {
String a = row.getAs("a");
String b = row.getAs("b");
// MyStruct is the generated avro class that corresponds to …Run Code Online (Sandbox Code Playgroud) 更新:spark-avro软件包已更新以支持此方案.https://github.com/databricks/spark-avro/releases/tag/v3.1.0
我有一个AVRO文件,由我控制之外的第三方创建,我需要使用spark处理.AVRO架构是一个记录,其中一个字段是混合联合类型:
{
"name" : "Properties",
"type" : {
"type" : "map",
"values" : [ "long", "double", "string", "bytes" ]
}
Run Code Online (Sandbox Code Playgroud)
spark-avro阅读器不支持此功能:
除了上面列出的类型之外,它还支持读取三种类型的联合类型:union(int,long)union(float,double)union(something,null),其中某些东西是上面列出的受支持的Avro类型之一,或者是支持的联合类型之一.
阅读AVRO的模式演变和解决方案,我希望能够通过指定省略此字段的不同读取器模式来跳过有问题的字段时读取文件.根据AVRO Schema Resolution文档,它应该工作:
如果作者的记录包含读者记录中不存在名称的字段,则忽略该作者对该字段的值.
所以我修改了使用
val df = sqlContext.read.option("avroSchema", avroSchema).avro(path)
Run Code Online (Sandbox Code Playgroud)
编写avroSchema器使用的完全相同的架构在哪里,但没有有问题的字段.
但是我仍然得到关于混合联合类型的相同错误.
AVRO支持这种架构演变的场景吗?与avro-spark?还有另一种方法来实现我的目标吗?
更新:我已经使用Apache Avro 1.8.1测试了相同的场景(实际上是相同的文件)并且它按预期工作.然后它必须具体spark-avro.有任何想法吗?
嗨,有一个主题是使用MultipleTextOutputFormat在一个spark作业中将文本数据写入多个输出目录
我会问是否有类似的方法将avro数据写入多个目录
我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天转到同一目录)
我是 Spark 新手,我试图弄清楚是否有一种方法可以将复杂对象(嵌套)或复杂 json 保存为 Spark 中的 Parquet。我知道 Kite SDK,但我知道它使用 Map/Reduce。
我环顾四周,但找不到解决方案。
感谢您的帮助。
我知道在 HDFS 中读取大量小文件的问题一直是一个问题并被广泛讨论,但请耐心等待。处理此类问题的大多数 stackoverflow 问题都与读取大量 txt 文件有关。我正在尝试读取大量小 avro 文件
另外,这些读取txt文件解决方案讨论使用WholeTextFileInputFormat或CombineInputFormat(/sf/answers/3072911341/)它们是RDD实现,我使用Spark 2.4(HDFS 3.0.0)并且通常不鼓励使用RDD实现和数据框是首选。我更喜欢使用数据帧,但也对 RDD 实现持开放态度。
我已经按照 Murtaza 的建议尝试合并数据帧,但在大量文件上出现 OOM 错误(/sf/answers/2248236301/)
我正在使用以下代码
val filePaths = avroConsolidator.getFilesInDateRangeWithExtension //pattern:filePaths: Array[String]
//I do need to create a list of file paths as I need to filter files based on file names. Need this logic for some upstream process
//example : Array("hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1530.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1531.avro","hdfs://server123:8020/source/Avro/weblog/2019/06/03/20190603_1532.avro")
val df_mid = sc.read.format("com.databricks.spark.avro").load(filePaths: _*)
val df = df_mid
.withColumn("dt", date_format(df_mid.col("timeStamp"), "yyyy-MM-dd"))
.filter("dt != 'null'")
df
.repartition(partitionColumns(inputs.logSubType).map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns(inputs.logSubType): _*) …Run Code Online (Sandbox Code Playgroud) 我正在使用以下代码将数据流推送到 Azure EventHub Microsoft.Hadoop.Avro.. 此代码每 5 秒运行一次,并简单地插入相同的两个 Avro 序列化项目:
var strSchema = File.ReadAllText("schema.json");
var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
var rootSchema = avroSerializer.WriterSchema as RecordSchema;
var itemList = new List<AvroRecord>();
dynamic record_one = new AvroRecord(rootSchema);
record_one.FirstName = "Some";
record_one.LastName = "Guy";
itemList.Add(record_one);
dynamic record_two = new AvroRecord(rootSchema);
record_two.FirstName = "A.";
record_two.LastName = "Person";
itemList.Add(record_two);
using (var buffer = new MemoryStream())
{
using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
{
using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
{
foreach (var item in …Run Code Online (Sandbox Code Playgroud) c# apache-spark azure-stream-analytics spark-avro azure-databricks
我使用 to_avro 将数据帧字段转换为 avro 字段结构,然后使用 from_avro 返回,如下所示。最终我想将 avro 有效负载流式传输到 kafka 写入/读取。当我尝试通过执行 df.show() 打印最终重新转换的数据帧时,出现 java.lang.ArrayIndexOutOfBoundsException 错误。df.printSchema 显示模式正确。
我正在使用 Spark 2.4.0 和 scala 2.11
我究竟做错了什么。字段中的空值是否会导致此问题?如果是这样,解决办法是什么?
这是代码
import org.apache.spark.sql.{ Row, SparkSession, Column, functions }
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Paths}
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.master("local[*]")
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val jsonDS = spark.createDataset(
"""{ "name": "Alyssa", "favorite_color": null, "favorite_numbers": [3, 9, 15, 20] } """ ::
"""{ "name": "Ben", "favorite_color": "red", "favorite_numbers": [] …Run Code Online (Sandbox Code Playgroud) 我有一个使用 JAVA api 创建的 avro 文件,当编写者在文件中写入数据时,程序由于机器重新启动而异常关闭。\n现在,当我尝试使用 Spark/hive 读取此文件时,它会读取一些数据并然后抛出以下错误 (org.apache.avro.AvroRuntimeException: java.io.IOException: 无效同步!)\xe2\x80\x93
\nINFO DAGScheduler: ShuffleMapStage 1 (count at DataReaderSpark.java:41) failed in 7.420 s due to Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 2, localhost, executor driver): org.apache.avro.AvroRuntimeException: java.io.IOException: Invalid sync!\n at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:210)\n at com.databricks.spark.avro.DefaultSource$$anonfun$buildReader$1$$anon$1.hasNext(DefaultSource.scala:215)\n at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)\n at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106)\n at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey$(Unknown Source)\n at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)\n at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)\n at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)\n at …Run Code Online (Sandbox Code Playgroud) spark-avro ×10
apache-spark ×9
avro ×6
scala ×3
apache-kafka ×1
avro-tools ×1
c# ×1
hdfs ×1
hive ×1
java ×1
json ×1
parquet ×1