我有一个Spark流媒体应用程序,该应用程序基本上从Kafka处获取触发消息,该消息会启动批处理,这可能需要2个小时。
有时候,某些作业会无限期地挂起,并且在正常时间内无法完成,因此,如果不手动检查Spark UI,就无法确定作业的状态。我希望有一种方法可以解决当前正在运行的Spark作业是否挂起的问题。因此,基本上,如果挂起超过30分钟,我想通知用户以便他们采取措施。我有什么选择?
我看到我可以使用驱动程序和执行程序中的指标。如果我选择最重要的记录,那将是最后收到的批记录。当 StreamingMetrics.streaming.lastReceivedBatch_records == 0
这可能意味着星火流作业已停止或失败。
但是在我的情况下,我将仅收到1个流触发事件,然后它将启动处理过程,这可能需要2个小时,因此我将无法依赖收到的记录。
有没有更好的办法?TIA
我将外部配置单元表存储为Parquet,在列上进行了分区,as_of_dt
并通过Spark Streaming插入了数据。现在每天都会添加新分区。我这样做是为了 msck repair table
使配置单元metastore获取新添加的分区信息。这是唯一的方法还是有更好的方法?我担心下游用户查询表是否会msck repair
导致数据不可用或陈旧数据出现任何问题?我正在浏览 HiveContext
API并查看refreshTable
选项。知道使用它是否有意义refreshTable
?
我的结构化流应用程序正在写入 parquet,我想摆脱它创建的 _spark_metadata 文件夹。我使用了下面的属性,看起来不错
--conf "spark.hadoop.parquet.enable.summary-metadata=false"
当应用程序启动时,不会_spark_metadata
生成文件夹。但是一旦它移动到 RUNNING 状态并开始处理消息,它就会失败并显示以下错误,提示_spark_metadata
文件夹不存在。似乎结构化流依赖于这个文件夹,没有它我们就无法运行。只是想知道在这种情况下禁用元数据属性是否有意义。这是流不是指 conf 的错误吗?
Caused by: java.io.FileNotFoundException: File /_spark_metadata does not exist.
at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:261)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1765)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1761)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1761)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1726)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1685)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.list(HDFSMetadataLog.scala:370)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:99)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
Run Code Online (Sandbox Code Playgroud) apache-spark parquet spark-streaming apache-spark-sql spark-structured-streaming
您好,我有一个场景,传入的消息是一个 Json,其标题为表名,数据部分包含表列数据。现在我想将其写入镶木地板到单独的文件夹,例如 /emp
和 /dept
。我可以通过根据表名聚合行来在常规流式传输中实现此目的。但在结构化流媒体中我无法分割它。我怎样才能在结构化流媒体中实现这一点。
{"tableName":"employee","data":{"empid":1","empname":"john","dept":"CS"} {"tableName":"employee","data": {"empid":2","empname":"james","dept":"CS"} {"tableName":"dept","data":{"dept":"1","deptname": "CS","desc":"计算机科学系"}
apache-spark spark-streaming apache-spark-sql spark-structured-streaming
我需要从一个配置文件中读取并将配置映射到一个案例类。如果我有一个如下表,它工作正常
配置
mapping {
target {
oracle = {
type = "oracle"
schema = "orcl"
tableName = "my_table"
query = "select key from my_table where dob='2020-01-01'
}
}
Run Code Online (Sandbox Code Playgroud)
SCALA 代码片段
val targetConfig:Map[String,QueryEngine] = config.getObject("mapping.target")
.entrySet()
.asScala
.foldLeft(Map.empty[String , QueryEngine]) { case ( acc , entry ) =>
val target = entry.getKey
val targetConfig = entry.getValue match {
case validElement if validElement.valueType() == ConfigValueType.OBJECT => validElement.asInstanceOf[ConfigObject].toConfig
case invalidElement => sys.error("illegal syntax at $invalidElement")
}
targetConfig.getString("type") match {
case "oracle" => acc …
Run Code Online (Sandbox Code Playgroud)