如何检查 HIVE 中是否存在任何特定分区:
我的配置单元表中有如下分区:
国家=印度/州=MH国家=美国/州=纽约
我想在 HIVE 或使用 shell 脚本中检查 country = "something and state="something" 是否存在。请帮忙
关于与HIVE表的spark结构化流式集成的一个查询.
我试图做一些火花结构流的例子.
这是我的榜样
val spark =SparkSession.builder().appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
.getOrCreate()
// Register the dataframe as a Hive table
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta")
csvDF.createOrReplaceTempView("updates")
val query= spark.sql("insert into table_abcd select * from updates")
query.writeStream.start()
Run Code Online (Sandbox Code Playgroud)
正如您在将数据帧写入hdfs位置时的最后一步所看到的那样,数据未插入到令人兴奋的目录中(我的现有目录中有一些旧数据被"age"分区).
我正进入(状态
spark.sql.AnalysisException:必须使用writeStream start()执行带有流源的查询
你能帮我解释为什么我无法将数据插入到hdfs位置的现有目录中吗?或者有没有其他方法可以在蜂巢表上"插入"操作?
寻找解决方案
我在一年中按操作执行了一个简单的分组,并进行了一些聚合,如下所示。我尝试将结果附加到 hdfs 路径,如下所示。我收到错误说,
org.apache.spark.sql.AnalysisException: Append output mode not supported
when there are streaming aggregations on streaming DataFrames/DataSets
without watermark;;
Aggregate [year#88], [year#88, sum(rating#89) AS rating#173,
sum(cast(duration#90 as bigint)) AS duration#175L]
+- EventTimeWatermark event_time#96: timestamp, interval 10 seconds
Run Code Online (Sandbox Code Playgroud)
下面是我的代码。有人可以帮忙吗
val spark =SparkSession.builder().appName("mddd").
enableHiveSupport().config("hive.exec.dynamic.partition", "true").
config("hive.exec.dynamic.partition.mode", "nonstrict").
config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
config("spark.debug.maxToStringFields",100).
getOrCreate()
val mySchema = StructType(Array(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("year", IntegerType),
StructField("rating", DoubleType),
StructField("duration", IntegerType)
))
val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/")
import java.util.Calendar
val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
val df_agg_with_time = df_agg_without_time.withWatermark("event_time", …Run Code Online (Sandbox Code Playgroud) 我想了解 Spark Structured Streaming 的单元测试方面。我的场景是,我从 Kafka 获取数据,我使用 Spark Structured Streaming 使用它并在数据之上应用一些转换。
我不确定如何使用 Scala 和 Spark 进行测试。有人可以告诉我如何使用 Scala 在结构化流中进行单元测试。我是流媒体的新手。
apache-spark spark-structured-streaming spark-streaming-kafka
有人可以解释一下Spark结构化流上foreach writer的需求吗?
当我们以dataFrame的形式获取所有源数据时,我没有使用foreachwriter。