我正在使用spark 1.6,我的目标是像在蜂巢脚本中一样创建外部蜂巢表。为此,我首先读取分区的avro文件并获取该文件的架构。现在我在这里停止了,我不知道如何将该模式应用于创建表。我使用scala。需要帮助的人。
嗨,有一个主题是使用MultipleTextOutputFormat在一个spark作业中将文本数据写入多个输出目录
我会问是否有类似的方法将avro数据写入多个目录
我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天转到同一目录)
我正在从Kafka提取数据,然后Array[Byte]使用默认的解码器反序列化,然后我的RDD元素如下所示(null,[B@406fa9b2):(null,[B@21a9fe0)但是我想要具有模式的原始数据,那么如何实现呢?
我以Avro格式序列化邮件。
我有一个代码可以使用功能将我的avro记录转换为Row avroToRowConverter()
directKafkaStream.foreachRDD(rdd -> {
JavaRDD<Row> newRDD= rdd.map(x->{
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
return avroToRowConverter(recordInjection.invert(x._2).get());
});
Run Code Online (Sandbox Code Playgroud)
此功能不适用于嵌套模式(TYPE= UNION)。
private static Row avroToRowConverter(GenericRecord avroRecord) {
if (null == avroRecord) {
return null;
}
//GenericData
Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
for (Schema.Field field : avroRecord.getSchema().getFields()) {
if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
objectArray[field.pos()] = ""+avroRecord.get(field.pos());
}else {
objectArray[field.pos()] = avroRecord.get(field.pos());
}
}
return new GenericRowWithSchema(objectArray, structType);
}
Run Code Online (Sandbox Code Playgroud)
谁能建议我如何将复杂的架构转换为ROW?
这适用于镶木地板
val sqlDF = spark.sql("SELECT DISTINCT field FROM parquet.`file-path'")
Run Code Online (Sandbox Code Playgroud)
我用Avro尝试了相同的方法,但是即使使用,它也会一直给我一个错误com.databricks.spark.avro。
当我执行以下查询时:
val sqlDF = spark.sql("SELECT DISTINCT Source_Product_Classification FROM avro.`file path`")
Run Code Online (Sandbox Code Playgroud)
我明白了AnalysisException。为什么?
org.apache.spark.sql.AnalysisException: Failed to find data source: avro. Please find an Avro package at http://spark.apache.org/third-party-projects.html;; line 1 pos 51
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.execution.datasources.ResolveDataSource$$anonfun$apply$1.applyOrElse(rules.scala:61)
at org.apache.spark.sql.execution.datasources.ResolveDataSource$$anonfun$apply$1.applyOrElse(rules.scala:38)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:58)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
at …Run Code Online (Sandbox Code Playgroud) 我想使用提供的 Avro 模式而不是 Spark 自动生成的模式编写 Avro 格式的 DataFrame。如何告诉 Spark 在写入时使用我的自定义架构?
无法从 Spark 流应用程序向 Kafka 主题发送 avro 格式消息。关于 avro spark 流示例代码的在线信息非常少。“to_avro”方法不需要 avro 模式,那么它将如何编码为 avro 格式?
有人可以帮助解决以下异常吗?
依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
下面是推送到kafka主题的代码
dataset.toDF.select(to_avro(struct(dataset.toDF.columns.map(column):_*))).alias("value").distinct.write.format("avro")
.option(KafkaConstants.BOOTSTRAP_SERVER, priBootStrapServers)
.option(ApplicationConstants.TOPIC_KEY, publishPriTopic)
.save()
Run Code Online (Sandbox Code Playgroud)
低于异常。
Caused by: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:614)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 GenericData.Record 将 avro 数据生成到 kafka 中,但出现以下异常:
线程“main”org.apache.avro.AvroRuntimeException 中出现异常:不是有效的架构字段:emailAddresses.email
这是我的架构:
{
"namespace": "com.cloudurable.User",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int", "default" : 0},
{"name": "fname", "type": "string", "default" : "EMPTY"},
{"name": "lname", "type": "string", "default" : "EMPTY"},
{"name": "phone_number", "type": "string", "default" : "EMPTY"},
{"name": "age", "type": "int", "default" : 0},
{
"name": "emailAddresses",
"type": {
"type": "record",
"name": "EmailAddress",
"fields": [
{
"name": "email",
"type": "string",
"default" : "EMPTY"
},
{
"name": "address",
"type": "boolean",
"default": …Run Code Online (Sandbox Code Playgroud) 我有一组基于 Avro 的配置单元表,我需要从中读取数据。由于 Spark-SQL 使用 hive serdes 从 HDFS 读取数据,因此比直接读取 HDFS 慢得多。所以我使用数据砖 Spark-Avro jar 从底层 HDFS 目录读取 Avro 文件。
一切正常,除非桌子是空的。我已设法使用以下命令从 hive 表的 .avsc 文件中获取架构,但出现错误“未找到 Avro 文件”
val schemaFile = FileSystem.get(sc.hadoopConfiguration).open(new Path("hdfs://myfile.avsc"));
val schema = new Schema.Parser().parse(schemaFile);
spark.read.format("com.databricks.spark.avro").option("avroSchema", schema.toString).load("/tmp/myoutput.avro").show()
Run Code Online (Sandbox Code Playgroud)
解决方法:
我在该目录中放置了一个空文件,同样的事情也能正常工作。
有没有其他方法可以实现相同的目标?比如conf设置之类的?
我在Google DataProc上有一个群集(图像1.4),并且我想用Spark从Google云存储读取avro文件。我遵循此指南:Spark阅读avro。
我运行的命令是:
gcloud dataproc jobs submit pyspark test.py \
--cluster $CLUSTER_NAME \
--region $REGION \
--properties spark.jars.packages='org.apache.spark:spark-avro_2.12:2.4.1'
Run Code Online (Sandbox Code Playgroud)
test.py非常简单,只是
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName('test').getOrCreate()
df = spark.read.format("avro").load("gs://mybucket/abc.avro")
df.show()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Py4JJavaError: An error occurred while calling o196.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at …Run Code Online (Sandbox Code Playgroud)