标签: spark-avro

在Spark中创建带有架构的配置单元外部表

我正在使用spark 1.6,我的目标是像在蜂巢脚本中一样创建外部蜂巢表。为此,我首先读取分区的avro文件并获取该文件的架构。现在我在这里停止了,我不知道如何将该模式应用于创建表。我使用scala。需要帮助的人。

hive apache-spark apache-spark-sql spark-avro

5
推荐指数
1
解决办法
1万
查看次数

如何使用spark将avro写入多个输出目录

嗨,有一个主题是使用MultipleTextOutputFormat在一个spark作业中将文本数据写入多个输出目录

通过键Spark写入多个输出 - 一个Spark作业

我会问是否有类似的方法将avro数据写入多个目录

我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天转到同一目录)

avro apache-spark spark-avro

5
推荐指数
1
解决办法
509
查看次数

如何将字节从Kafka转换为其原始对象?

我正在从Kafka提取数据,然后Array[Byte]使用默认的解码器反序列化,然后我的RDD元素如下所示(null,[B@406fa9b2)(null,[B@21a9fe0)但是我想要具有模式的原始数据,那么如何实现呢?

我以Avro格式序列化邮件。

apache-kafka apache-spark spark-streaming spark-avro

5
推荐指数
1
解决办法
2833
查看次数

如何将嵌套的Avro GenericRecord转换为行

我有一个代码可以使用功能将我的avro记录转换为Row avroToRowConverter()

directKafkaStream.foreachRDD(rdd -> {
        JavaRDD<Row> newRDD= rdd.map(x->{

            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
            return avroToRowConverter(recordInjection.invert(x._2).get());
            });
Run Code Online (Sandbox Code Playgroud)

此功能不适用于嵌套模式(TYPE= UNION)

private static Row avroToRowConverter(GenericRecord avroRecord) {
    if (null == avroRecord) {
        return null;
    }
    //GenericData
    Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
    StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
    for (Schema.Field field : avroRecord.getSchema().getFields()) {

        if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
            objectArray[field.pos()] = ""+avroRecord.get(field.pos());
        }else {
            objectArray[field.pos()] = avroRecord.get(field.pos());
        }
    }

    return new GenericRowWithSchema(objectArray, structType);
}
Run Code Online (Sandbox Code Playgroud)

谁能建议我如何将复杂的架构转换为ROW?

java avro apache-spark spark-avro

5
推荐指数
1
解决办法
2230
查看次数

如何以avro格式查询数据集?

这适用于镶木地板

 val sqlDF = spark.sql("SELECT DISTINCT field FROM parquet.`file-path'")
Run Code Online (Sandbox Code Playgroud)

我用Avro尝试了相同的方法,但是即使使用,它也会一直给我一个错误com.databricks.spark.avro

当我执行以下查询时:

val sqlDF = spark.sql("SELECT DISTINCT Source_Product_Classification FROM avro.`file path`")
Run Code Online (Sandbox Code Playgroud)

我明白了AnalysisException。为什么?

org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html;; line 1 pos 51
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.execution.datasources.ResolveDataSource$$anonfun$apply$1.applyOrElse(rules.scala:61)
  at org.apache.spark.sql.execution.datasources.ResolveDataSource$$anonfun$apply$1.applyOrElse(rules.scala:38)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
  at …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-avro

4
推荐指数
1
解决办法
3400
查看次数

Spark DataFrame:以 Avro 形式编写时如何指定架构

我想使用提供的 Avro 模式而不是 Spark 自动生成的模式编写 Avro 格式的 DataFrame。如何告诉 Spark 在写入时使用我的自定义架构?

apache-spark apache-spark-sql spark-avro

4
推荐指数
1
解决办法
1万
查看次数

无法实例化提供程序 org.apache.spark.sql.avro.AvroFileFormat

无法从 Spark 流应用程序向 Kafka 主题发送 avro 格式消息。关于 avro spark 流示例代码的在线信息非常少。“to_avro”方法不需要 avro 模式,那么它将如何编码为 avro 格式?

有人可以帮助解决以下异常吗?

依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_2.12</artifactId>
    <version>2.4.4</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    <version>2.4.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

下面是推送到kafka主题的代码

dataset.toDF.select(to_avro(struct(dataset.toDF.columns.map(column):_*))).alias("value").distinct.write.format("avro")
      .option(KafkaConstants.BOOTSTRAP_SERVER, priBootStrapServers)
      .option(ApplicationConstants.TOPIC_KEY, publishPriTopic)
      .save()
Run Code Online (Sandbox Code Playgroud)

低于异常。

Caused by: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:614)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
    at …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-avro spark-streaming-kafka

4
推荐指数
1
解决办法
5003
查看次数

在 Avro 架构中的嵌套字段中设置值

我正在尝试使用 GenericData.Record 将 avro 数据生成到 kafka 中,但出现以下异常:

线程“main”org.apache.avro.AvroRuntimeException 中出现异常:不是有效的架构字段:emailAddresses.email

这是我的架构:

{
 "namespace": "com.cloudurable.User",

 "type": "record",

 "name": "User",

 "fields": [

     {"name": "id", "type": "int", "default" : 0},
     {"name": "fname",  "type": "string", "default" : "EMPTY"},
     {"name": "lname",  "type": "string", "default" : "EMPTY"},
     {"name": "phone_number",  "type": "string", "default" : "EMPTY"},
     {"name": "age",  "type": "int", "default" : 0},

     {
      "name": "emailAddresses",
      "type": {
          "type": "record",
          "name": "EmailAddress",
          "fields": [
            {
              "name": "email",
              "type": "string",
              "default" : "EMPTY"
            },
            {
              "name": "address",
              "type": "boolean",
              "default": …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka spark-avro

3
推荐指数
1
解决办法
6362
查看次数

如何在 Spark 中创建一个空的数据帧

我有一组基于 Avro 的配置单元表,我需要从中读取数据。由于 Spark-SQL 使用 hive serdes 从 HDFS 读取数据,因此比直接读取 HDFS 慢得多。所以我使用数据砖 Spark-Avro jar 从底层 HDFS 目录读取 Avro 文件。

一切正常,除非桌子是空的。我已设法使用以下命令从 hive 表的 .avsc 文件中获取架构,但出现错误“未找到 Avro 文件

val schemaFile = FileSystem.get(sc.hadoopConfiguration).open(new Path("hdfs://myfile.avsc"));

val schema = new Schema.Parser().parse(schemaFile);

spark.read.format("com.databricks.spark.avro").option("avroSchema", schema.toString).load("/tmp/myoutput.avro").show()
Run Code Online (Sandbox Code Playgroud)

解决方法:

我在该目录中放置了一个空文件,同样的事情也能正常工作。

有没有其他方法可以实现相同的目标?比如conf设置之类的?

scala avro apache-spark apache-spark-sql spark-avro

3
推荐指数
3
解决办法
2万
查看次数

无法使用Spark-Avro在DataProc Spark中读取Avro

我在Google DataProc上有一个群集(图像1.4),并且我想用Spark从Google云存储读取avro文件。我遵循此指南:Spark阅读avro

我运行的命令是:

gcloud dataproc jobs submit pyspark test.py \
--cluster $CLUSTER_NAME \
--region $REGION \
--properties spark.jars.packages='org.apache.spark:spark-avro_2.12:2.4.1'
Run Code Online (Sandbox Code Playgroud)

test.py非常简单,只是

from pyspark.sql import SparkSession

from pyspark.sql import SQLContext

spark = SparkSession.builder.appName('test').getOrCreate()

df = spark.read.format("avro").load("gs://mybucket/abc.avro")

df.show()
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Py4JJavaError: An error occurred while calling o196.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
    at java.util.ServiceLoader.fail(ServiceLoader.java:232)
    at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
    at …
Run Code Online (Sandbox Code Playgroud)

pyspark google-cloud-dataproc spark-avro

3
推荐指数
1
解决办法
1144
查看次数