小编Aji*_*nan的帖子

检查Spark流作业是否挂起的最佳方法

我有一个Spark流媒体应用程序,该应用程序基本上从Kafka处获取触发消息,该消息会启动批处理,这可能需要2个小时。

有时候,某些作业会无限期地挂起,并且在正常时间内无法完成,因此,如果不手动检查Spark UI,就无法确定作业的状态。我希望有一种方法可以解决当前正在运行的Spark作业是否挂起的问题。因此,基本上,如果挂起超过30分钟,我想通知用户以便他们采取措施。我有什么选择?

我看到我可以使用驱动程序和执行程序中的指标。如果我选择最重要的记录,那将是最后收到的批记录。当 StreamingMetrics.streaming.lastReceivedBatch_records == 0 这可能意味着星火流作业已停止或失败。

但是在我的情况下,我将仅收到1个流触发事件,然后它将启动处理过程,这可能需要2个小时,因此我将无法依赖收到的记录。

有没有更好的办法?TIA

bigdata apache-spark spark-streaming apache-spark-sql

8
推荐指数
1
解决办法
555
查看次数

外部Hive表刷新表与MSCK修复

我将外部配置单元表存储为Parquet,在列上进行了分区,as_of_dt并通过Spark Streaming插入了数据。现在每天都会添加新分区。我这样做是为了 msck repair table使配置单元metastore获取新添加的分区信息。这是唯一的方法还是有更好的方法?我担心下游用户查询表是否会msck repair导致数据不可用或陈旧数据出现任何问题?我正在浏览 HiveContextAPI并查看refreshTable选项。知道使用它是否有意义refreshTable

hive apache-spark hivecontext

6
推荐指数
2
解决办法
6644
查看次数

在 spark 2.3.0 中的结构化流中禁用 _spark_metadata

我的结构化流应用程序正在写入 parquet,我想摆脱它创建的 _spark_metadata 文件夹。我使用了下面的属性,看起来不错

--conf "spark.hadoop.parquet.enable.summary-metadata=false"

当应用程序启动时,不会_spark_metadata生成文件夹。但是一旦它移动到 RUNNING 状态并开始处理消息,它就会失败并显示以下错误,提示_spark_metadata文件夹不存在。似乎结构化流依赖于这个文件夹,没有它我们就无法运行。只是想知道在这种情况下禁用元数据属性是否有意义。这是流不是指 conf 的错误吗?

Caused by: java.io.FileNotFoundException: File /_spark_metadata does not exist.
        at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:261)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1765)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1761)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1761)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1726)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1685)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.list(HDFSMetadataLog.scala:370)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:99)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet spark-streaming apache-spark-sql spark-structured-streaming

5
推荐指数
1
解决办法
2297
查看次数

嵌套 json 中的结构化流式传输不同模式

您好,我有一个场景,传入的消息是一个 Json,其标题为表名,数据部分包含表列数据。现在我想将其写入镶木地板到单独的文件夹,例如 /emp/dept。我可以通过根据表名聚合行来在常规流式传输中实现此目的。但在结构化流媒体中我无法分割它。我怎样才能在结构化流媒体中实现这一点。

{"tableName":"employee","data":{"empid":1","empname":"john","dept":"CS"} {"tableName":"employee","data": {"empid":2","empname":"james","dept":"CS"} {"tableName":"dept","data":{"dept":"1","deptname": "CS","desc":"计算机科学系"}

apache-spark spark-streaming apache-spark-sql spark-structured-streaming

2
推荐指数
1
解决办法
1214
查看次数

Scala 加载配置 Map of Map

我需要从一个配置文件中读取并将配置映射到一个案例类。如果我有一个如下表,它工作正常

配置

mapping {
   target {
     oracle  = {
         type = "oracle"
         schema    = "orcl"
         tableName = "my_table"
         query = "select key from my_table where dob='2020-01-01'
     }
}
Run Code Online (Sandbox Code Playgroud)

SCALA 代码片段

 val targetConfig:Map[String,QueryEngine] = config.getObject("mapping.target")
    .entrySet()
    .asScala
    .foldLeft(Map.empty[String , QueryEngine]) { case ( acc , entry ) =>
      val target = entry.getKey
      val targetConfig = entry.getValue match {
        case validElement if validElement.valueType() == ConfigValueType.OBJECT  => validElement.asInstanceOf[ConfigObject].toConfig
        case invalidElement => sys.error("illegal syntax at $invalidElement")
      }

      targetConfig.getString("type")    match {

        case "oracle" => acc …
Run Code Online (Sandbox Code Playgroud)

scala scala-collections typesafe-config

2
推荐指数
1
解决办法
165
查看次数