小编Big*_*igD的帖子

如何检查 HIVE 中是否存在任何特定分区

如何检查 HIVE 中是否存在任何特定分区:

我的配置单元表中有如下分区:

国家=印度/州=MH国家=美国/州=纽约

我想在 HIVE 或使用 shell 脚本中检查 country = "something and state="something" 是否存在。请帮忙

shell hadoop hive

7
推荐指数
1
解决办法
9988
查看次数

如何将Spark结构化流式DataFrame插入到Hive外部表/位置?

关于与HIVE表的spark结构化流式集成的一个查询.

我试图做一些火花结构流的例子.

这是我的榜样

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()
Run Code Online (Sandbox Code Playgroud)

正如您在将数据帧写入hdfs位置时的最后一步所看到的那样,数据未插入到令人兴奋的目录中(我的现有目录中有一些旧数据被"age"分区).

我正进入(状态

spark.sql.AnalysisException:必须使用writeStream start()执行带有流源的查询

你能帮我解释为什么我无法将数据插入到hdfs位置的现有目录中吗?或者有没有其他方法可以在蜂巢表上"插入"操作?

寻找解决方案

hive apache-spark spark-structured-streaming

7
推荐指数
1
解决办法
2299
查看次数

spark结构化流异常:无水印不支持追加输出模式

我在一年中按操作执行了一个简单的分组,并进行了一些聚合,如下所示。我尝试将结果附加到 hdfs 路径,如下所示。我收到错误说,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
Run Code Online (Sandbox Code Playgroud)

下面是我的代码。有人可以帮忙吗

    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-structured-streaming

5
推荐指数
1
解决办法
4923
查看次数

如何对 Spark Structured Streaming 执行单元测试?

我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,我使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。

我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体的新手。

apache-spark spark-structured-streaming spark-streaming-kafka

2
推荐指数
1
解决办法
2474
查看次数

Spark结构化流中ForeachWriter的目的是什么?

有人可以解释一下Spark结构化流上foreach writer的需求吗?

当我们以dataFrame的形式获取所有源数据时,我没有使用foreachwriter。

apache-spark spark-structured-streaming

0
推荐指数
1
解决办法
444
查看次数