小编mav*_*rik的帖子

结构化流不会将DF写入文件接收器,引用/_spark_metadata/9.compact不存在

我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块.我的目的是使用结构化流来消费Kafka主题,进行一些处理,并以镶木地板格式存储到EMRFS/S3.

控制台接收器按预期工作,文件接收器不起作用.

spark-shell:

val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")

val outputdf = <some processing on eventdf>
Run Code Online (Sandbox Code Playgroud)

这有效:

val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start 
Run Code Online (Sandbox Code Playgroud)

这不是:

val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails
Run Code Online (Sandbox Code Playgroud)

我试过的东西不起作用:

  1. sc.hadoopConfiguration.set("parquet.enable.summary-metadata","false")
  2. 将格式更改为CSV而不是镶木地板
  3. 将输出模式更改为完成(仅支持追加)
  4. 不同的触发间隔
  5. readStream上的.option("failOnDataLoss",false)

一些挖掘源代码的人把我带到了这里:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog. scala ,它表示缺少.compact文件应触发默认值.

因此尝试:spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay",60000)以确保在新批处理创建组合元数据文件之前未删除旧批处理元数据

使这个错误烦人的原因是它并不总是可重现的.在不更改代码中的单个字符的情况下,写入镶木地板有时会起作用,或者不起作用.我已经尝试清理检查点位置,spark/hdfs日志等,以防火花内部的"状态"导致此问题.

这是错误堆栈跟踪:

query: …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-emr apache-spark spark-structured-streaming

11
推荐指数
1
解决办法
2670
查看次数

在结构化流中找不到连续触发器

运行时:Spark 2.3.0、Scala 2.11(Databricks 4.1 ML beta)

import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._

//kafka settings and df definition goes here

val query = df.writeStream.format("parquet")
.option("path", ...)
.option("checkpointLocation",...)
.trigger(continuous(30000))
.outputMode(OutputMode.Append)
.start
Run Code Online (Sandbox Code Playgroud)

未找到引发错误:值连续

其他无效的尝试:

.trigger(continuous = "30 seconds") //as per Databricks blog
// throws same error as above

.trigger(Trigger.Continuous("1 second")) //as per Spark docs
// throws java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)
Run Code Online (Sandbox Code Playgroud)

参考:

(Databricks 博客) https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html

(火花指南) http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#continuous-processing

(Scaladoc) https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.streaming.package

apache-spark spark-structured-streaming

5
推荐指数
1
解决办法
2236
查看次数

将命令行参数传递给调用带有装饰器参数的装饰函数的函数

这个例子是人为的,但代表了现实生活中的情况:

  1. 我有一个接受命令行参数的 python 脚本。

  2. main()将解析参数,并将它们传递给中间函数(caller_func在代码示例中)

  3. 然后,中间函数将调用一个用fromfib()修饰的修饰函数(在示例中),并且缓存的 是从命令行接受并通过中间函数传递的参数。lru_cachefunctoolsmaxsize

我该怎么做呢?

import argparse
from functools import lru_cache

def main():
    # boilerplate for parsing command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--cache_size", default="10")
    parser.add_argument("--fibo_num", default="20")
    args = parser.parse_args()
    cache_size = int(args.cache_size)
    fibo_num = int(args.fibo_num)

    caller_func(cache_size, fibo_num)

#Intermediate function that is supposed to call decorated function
def caller_func(cache_size, fib_num): 
    print(fib(fib_num))

#function decorated by LRU cache
@lru_cache(maxsize=cache_size)
def fib(n): 
    if n < 2:
        return n
    return fib(n-1) + fib(n-2) …
Run Code Online (Sandbox Code Playgroud)

python decorator argparse python-3.x python-decorators

5
推荐指数
1
解决办法
1008
查看次数

是否存在具有小数步长的Scala"范围"方法(类似于Python)?

这有效:

scala> 0 to 24 by 5
res16: scala.collection.immutable.Range = Range(0, 5, 10, 15, 20)
Run Code Online (Sandbox Code Playgroud)

负整数也有效:

scala> 0 to 24 by -1
res17: scala.collection.immutable.Range = Range()
Run Code Online (Sandbox Code Playgroud)

需要一个整数:

scala> 0 to 24 by 0.5

<console>:40: error: type mismatch;
 found   : Double(0.5)
 required: Int
              0 to 24 by 0.5
                         ^
Run Code Online (Sandbox Code Playgroud)

scala

0
推荐指数
1
解决办法
101
查看次数