小编Sam*_*Sam的帖子

Json 架构验证错误

我在验证 JSON 时遇到一些错误。我无法理解这些错误,任何人都可以帮忙解释一下。

{
"$schema": "http://json-schema.org/draft-04/schema#",
"description": "Modified JSON Schema draft v4 that includes the optional '$ref' and 'format'",
"definitions": {
    "schemaArray": {
        "type": "array",
        "minItems": 1,
        "items": { "$ref": "#" }
    },
    "positiveInteger": {
        "type": "integer",
        "minimum": 0
    },
    "positiveIntegerDefault0": {
        "allOf": [ { "$ref": "#/definitions/positiveInteger" }, { "default": 0 } ]
    },
    "simpleTypes": {
        "enum": [ "array", "boolean", "integer", "null", "number", "object", "string" ]
    },
    "stringArray": {
        "type": "array",
        "items": { "type": "string" },
        "minItems": 1,
        "uniqueItems": true …
Run Code Online (Sandbox Code Playgroud)

validation schema json jsonschema json-schema-validator

6
推荐指数
1
解决办法
4万
查看次数

For 循环 gitlab-ci.yml

我有这个代码片段。它给了我语法错误:意外的文件结尾如果我将其复制到 .sh 文件中并在终端中运行,它就可以工作。

before_script:
    - sbt sbtVersion
    - for file in ./pending/*.sql; do
        file=$(basename "$file")
        export str_opt="$(cat ./pending/"$file"|tr '\n' ' ')"
        mv ./pending/"$file" ./done/
        done
Run Code Online (Sandbox Code Playgroud)

我哪里出错了?

bash shell gitlab-ci

5
推荐指数
1
解决办法
9302
查看次数

java.lang.AssertionError: assertion failed 当加入流 Spark 时

我有这个代码

val counter = event_stream
    .withWatermark("timestamp", "5 minutes")
    .groupBy(
      window($"timestamp", "10 minutes", "5 minutes"),
      $"value")
    .agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
    .drop("window")
    .withColumnRenamed("value","join_id")

    counter.printSchema


  val counter1 = event_stream
    .groupBy("value")
    .count()
//    .agg(count("value") as "kafka.count",collect_set("topic") as "kafka.topic")
    .withColumnRenamed("value","join_id")

  counter1.printSchema()

  val result_stream = event_stream.join(counter,$"value" === $"join_id")
    .drop("key")
    .drop("value")
    .drop("partition")
    .drop("timestamp")
    .drop("join_id")
    .drop("timestampType")
    .drop("offset")
//    .withColumnRenamed("count(value)", "kafka.count")
    .withColumnRenamed("topic","kafka.topic")

  result_stream.printSchema()

  KafkaSink.write(counter, topic_produce)
//  KafkaSink.writeToConsole(result_stream, topic_produce)
Run Code Online (Sandbox Code Playgroud)

如果我将它发送到我使用 Outputmode.Complete 的控制台,它可以正常工作,但是当我使用 OutputMode.Append 时。在上面发送不同的流查询时,它会给出不同的错误。

这是我的写功能

private def writeStream(df:DataFrame, topic:String): StreamingQuery = {
    df
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KafkaUtils.kafkaServers)
      .option("topic", topic) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-structured-streaming

3
推荐指数
1
解决办法
4270
查看次数

找到值后如何退出(块)嵌套迭代器?

我有一些这样的代码,带有嵌套的迭代器。如果 find 返回和 Some(value),我想停止,退出 foreach/map 和 return(value),并继续 incase 的 None。这里的正确方法是什么?

编辑:这是完整的功能

Line Schema: line_id, name

Stop Schema: stop_id, x, y

Time Schema: line_id, stop_id, time
Run Code Online (Sandbox Code Playgroud)

给定时间和 x,y 我想找到 line_id 和 name。所以暂时我坚持获取 valueWhichImTryingToGet。

  def findVehicle(time: String, x: String, y: String) = {
    val stopIter = Source.fromFile("stops.csv").getLines().drop(1)
    val stopCols = stopIter.map(_.split(",").map(_.trim))
    val stopIds = stopCols.filter(arr => arr(1) == x && arr(2) == y)

     val valueWhichImTryingToGet = stopIds.map { arr =>
      val iter = Source.fromFile("times.csv").getLines().drop(1)
      val cols = iter.map(_.split(",").map(_.trim))
      cols.find(col => col(1) …
Run Code Online (Sandbox Code Playgroud)

iterator scala

1
推荐指数
1
解决办法
71
查看次数