我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块.我的目的是使用结构化流来消费Kafka主题,进行一些处理,并以镶木地板格式存储到EMRFS/S3.
控制台接收器按预期工作,文件接收器不起作用.
在spark-shell:
val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()
val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")
val outputdf = <some processing on eventdf>
Run Code Online (Sandbox Code Playgroud)
这有效:
val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start
Run Code Online (Sandbox Code Playgroud)
这不是:
val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails
Run Code Online (Sandbox Code Playgroud)
我试过的东西不起作用:
一些挖掘源代码的人把我带到了这里:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog. scala ,它表示缺少.compact文件应触发默认值.
因此尝试:spark.conf.set("spark.sql.streaming.fileSink.log.cleanupDelay",60000)以确保在新批处理创建组合元数据文件之前未删除旧批处理元数据
使这个错误烦人的原因是它并不总是可重现的.在不更改代码中的单个字符的情况下,写入镶木地板有时会起作用,或者不起作用.我已经尝试清理检查点位置,spark/hdfs日志等,以防火花内部的"状态"导致此问题.
这是错误堆栈跟踪:
query: …Run Code Online (Sandbox Code Playgroud) amazon-s3 amazon-emr apache-spark spark-structured-streaming
运行时:Spark 2.3.0、Scala 2.11(Databricks 4.1 ML beta)
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
//kafka settings and df definition goes here
val query = df.writeStream.format("parquet")
.option("path", ...)
.option("checkpointLocation",...)
.trigger(continuous(30000))
.outputMode(OutputMode.Append)
.start
Run Code Online (Sandbox Code Playgroud)
未找到引发错误:值连续
其他无效的尝试:
.trigger(continuous = "30 seconds") //as per Databricks blog
// throws same error as above
.trigger(Trigger.Continuous("1 second")) //as per Spark docs
// throws java.lang.IllegalStateException: Unknown type of trigger: ContinuousTrigger(1000)
Run Code Online (Sandbox Code Playgroud)
参考:
(Databricks 博客) https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html
(火花指南) http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#continuous-processing
(Scaladoc) https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.streaming.package
这个例子是人为的,但代表了现实生活中的情况:
我有一个接受命令行参数的 python 脚本。
main()将解析参数,并将它们传递给中间函数(caller_func在代码示例中)
然后,中间函数将调用一个用fromfib()修饰的修饰函数(在示例中),并且缓存的 是从命令行接受并通过中间函数传递的参数。lru_cachefunctoolsmaxsize
我该怎么做呢?
import argparse
from functools import lru_cache
def main():
# boilerplate for parsing command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--cache_size", default="10")
parser.add_argument("--fibo_num", default="20")
args = parser.parse_args()
cache_size = int(args.cache_size)
fibo_num = int(args.fibo_num)
caller_func(cache_size, fibo_num)
#Intermediate function that is supposed to call decorated function
def caller_func(cache_size, fib_num):
print(fib(fib_num))
#function decorated by LRU cache
@lru_cache(maxsize=cache_size)
def fib(n):
if n < 2:
return n
return fib(n-1) + fib(n-2) …Run Code Online (Sandbox Code Playgroud) 这有效:
scala> 0 to 24 by 5
res16: scala.collection.immutable.Range = Range(0, 5, 10, 15, 20)
Run Code Online (Sandbox Code Playgroud)
负整数也有效:
scala> 0 to 24 by -1
res17: scala.collection.immutable.Range = Range()
Run Code Online (Sandbox Code Playgroud)
需要一个整数:
scala> 0 to 24 by 0.5
<console>:40: error: type mismatch;
found : Double(0.5)
required: Int
0 to 24 by 0.5
^
Run Code Online (Sandbox Code Playgroud) apache-spark ×2
amazon-emr ×1
amazon-s3 ×1
argparse ×1
decorator ×1
python ×1
python-3.x ×1
scala ×1