我正在使用结构化流来从Kafka读取数据并创建各种聚合度量.我使用了Graphite sink metrics.properties.我见过较旧的Spark版本中的应用程序具有与流相关的指标.我没有看到结构化流媒体的流媒体相关指标.我究竟做错了什么?
例如 - 无法找到未处理的批次或运行批次或上次完成的批次总延迟.
我通过设置启用了流量指标:
SparkSession.builder().config("spark.sql.streaming.metricsEnabled",true)
Run Code Online (Sandbox Code Playgroud)
即便如此,我只得到3个指标:
这些指标在它们之间存在差距.它也会在应用程序启动后很晚才开始显示.如何获得与grafana广泛的流媒体相关指标?
我检查StreamingQueryProgress.我们只能使用此方法以编程方式创建自定义指标.有没有办法可以消耗Spark流已经发送到我提到的接收器的指标?
graphite apache-spark apache-spark-sql spark-structured-streaming
如何从我的spark流工作发送指标以打开tsdb数据库?我试图在Grafana中使用open tsdb作为数据源.你可以帮我一些我可以开始的参考资料.
我确实看到开放的tsdb记者在这里做类似的工作.如何整合Spark流媒体作业的指标来使用它?有没有简单的选择呢.
我有一个结构化的流应用程序,并且在微批获得的数据中发生窗口聚合(基于事件时间).假设我有一个10分钟的窗口,每5分钟刷新一次.我有窗口9-9.10AM,9.05-9.15AM的聚合.但是有一个延迟,申请现在是9.31 AM.我可以确保它开始计算最近的窗口而不试图赶上旧的聚合吗?(比如从早上9点20分到9点30分开始,赶上最近的窗口聚合)
data = data
.withWatermark("dateField", "1 minute")
.groupBy(col(column_x), functions.window(col("dateField"), "30 minutes", "10 minutes"), col(column_y))
.count();
query = data
.writeStream()
.format("parquet")
.option("path", xxxx)
.option("truncate", "false")
.outputMode("append")
.option("checkpointLocation", xxxx)
.start();
query.awaitTermination();
Run Code Online (Sandbox Code Playgroud) 我试图理解“编程面试要素”中绘制布尔矩阵问题的 BFS 解决方案的空间复杂度。它类似于Leetcode中的Flood fill问题(问题733)。
解决方案是这样的。我可以将需要更改的当前元素添加到队列中。任何相邻(顶部/底部/向上/向下)节点也需要更改。所以我将它们添加到队列中(如果它们满足添加到队列的条件。每当处理队列中的元素时,都会添加其相邻元素。我们将进行处理,直到队列不为空。
我认为空间复杂度(最坏情况)将为 O(MN),因为所有元素也可能在队列中。但书中提到,最坏情况的空间复杂度是 O(M+N),因为距节点给定距离最多有 O(M+N) 个条目。我知道元素也会不断地从队列中删除。即使如此,我也很难想象他们是如何达到这种空间复杂性的。有人可以帮我理解吗?