我在验证 JSON 时遇到一些错误。我无法理解这些错误,任何人都可以帮忙解释一下。
{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Modified JSON Schema draft v4 that includes the optional '$ref' and 'format'",
"definitions": {
"schemaArray": {
"type": "array",
"minItems": 1,
"items": { "$ref": "#" }
},
"positiveInteger": {
"type": "integer",
"minimum": 0
},
"positiveIntegerDefault0": {
"allOf": [ { "$ref": "#/definitions/positiveInteger" }, { "default": 0 } ]
},
"simpleTypes": {
"enum": [ "array", "boolean", "integer", "null", "number", "object", "string" ]
},
"stringArray": {
"type": "array",
"items": { "type": "string" },
"minItems": 1,
"uniqueItems": true …Run Code Online (Sandbox Code Playgroud) 我有这个代码片段。它给了我语法错误:意外的文件结尾如果我将其复制到 .sh 文件中并在终端中运行,它就可以工作。
before_script:
- sbt sbtVersion
- for file in ./pending/*.sql; do
file=$(basename "$file")
export str_opt="$(cat ./pending/"$file"|tr '\n' ' ')"
mv ./pending/"$file" ./done/
done
Run Code Online (Sandbox Code Playgroud)
我哪里出错了?
我有这个代码
val counter = event_stream
.withWatermark("timestamp", "5 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"value")
.agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
.drop("window")
.withColumnRenamed("value","join_id")
counter.printSchema
val counter1 = event_stream
.groupBy("value")
.count()
// .agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
.withColumnRenamed("value","join_id")
counter1.printSchema()
val result_stream = event_stream.join(counter,$"value" === $"join_id")
.drop("key")
.drop("value")
.drop("partition")
.drop("timestamp")
.drop("join_id")
.drop("timestampType")
.drop("offset")
// .withColumnRenamed("count(value)", "kafka.count")
.withColumnRenamed("topic","kafka.topic")
result_stream.printSchema()
KafkaSink.write(counter, topic_produce)
// KafkaSink.writeToConsole(result_stream, topic_produce)
Run Code Online (Sandbox Code Playgroud)
如果我将它发送到我使用 Outputmode.Complete 的控制台,它可以正常工作,但是当我使用 OutputMode.Append 时。在上面发送不同的流查询时,它会给出不同的错误。
这是我的写功能
private def writeStream(df:DataFrame, topic:String): StreamingQuery = {
df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaUtils.kafkaServers)
.option("topic", topic) …Run Code Online (Sandbox Code Playgroud) 我有一些这样的代码,带有嵌套的迭代器。如果 find 返回和 Some(value),我想停止,退出 foreach/map 和 return(value),并继续 incase 的 None。这里的正确方法是什么?
编辑:这是完整的功能
Line Schema: line_id, name
Stop Schema: stop_id, x, y
Time Schema: line_id, stop_id, time
Run Code Online (Sandbox Code Playgroud)
给定时间和 x,y 我想找到 line_id 和 name。所以暂时我坚持获取 valueWhichImTryingToGet。
def findVehicle(time: String, x: String, y: String) = {
val stopIter = Source.fromFile("stops.csv").getLines().drop(1)
val stopCols = stopIter.map(_.split(",").map(_.trim))
val stopIds = stopCols.filter(arr => arr(1) == x && arr(2) == y)
val valueWhichImTryingToGet = stopIds.map { arr =>
val iter = Source.fromFile("times.csv").getLines().drop(1)
val cols = iter.map(_.split(",").map(_.trim))
cols.find(col => col(1) …Run Code Online (Sandbox Code Playgroud) apache-kafka ×1
apache-spark ×1
bash ×1
gitlab-ci ×1
iterator ×1
json ×1
jsonschema ×1
scala ×1
schema ×1
shell ×1
validation ×1