标签: spark-streaming

当无水印的流式 DataFrame/DataSet 上存在流式聚合时,不支持追加输出模式

我有一个 kafka 流,正在加载到 Spark。来自 Kafka 主题的消息具有以下属性:bl_ibanblacklistedtimestamp。因此,有 IBANS、关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,并且还有该记录的时间戳。问题是一个 IBAN 可以有多个记录,因为超时的 IBAN 可能会被列入黑名单或“删除”。我想要实现的目标是了解每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是列出每个最新的 IBAN timestamp(之后我也想添加blacklisted状态),所以我生成了以下代码(其中黑名单代表我从 Kafka 加载的数据集):

blackList = blackList.groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

之后我尝试使用以下代码将其打印到控制台:

StreamingQuery query = blackList.writeStream()
    .format("console")
    .outputMode(OutputMode.Append())
    .start();
Run Code Online (Sandbox Code Playgroud)

我已经运行我的代码并收到以下错误: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark

所以我将水印添加到我的数据集中,如下所示:

blackList = blackList.withWatermark("timestamp", "2 seconds")
                .groupBy("bl_iban")
                .agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)

之后又出现同样的错误。我有什么想法可以解决这个问题吗?


更新:在迈克的帮助下,我成功地摆脱了这个错误。但问题是我仍然无法让我的黑名单发挥作用。我可以看到数据是如何从 Kafka 加载的,但之后从我的组操作中我得到了两个空批次,仅此而已。从Kafka打印的数据:

+-----------------------+-----------+-----------------------+
|bl_iban                |blacklisted|timestamp              |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N …
Run Code Online (Sandbox Code Playgroud)

java apache-spark spark-streaming spark-structured-streaming

1
推荐指数
1
解决办法
1681
查看次数

如何在控制台中 writeStream 数据帧?(Scala Spark 流)

我想调试我的笔记本,因此我需要在笔记本控制台模式下打印流数据。我有两个问题: 1-是否可以这样做:

df.writeStream.format("console").start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)

2-如果是,我在哪里可以看到输出?

在此输入图像描述 这是 10 分钟后的结果...没有错误或结果

谢谢!

scala amazon-emr apache-spark spark-streaming

1
推荐指数
1
解决办法
1万
查看次数

有没有办法在 Kafka Streaming 的“foreachBatch”函数中传递附加/额外参数?

我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。

def write_stream_batches(kafka_df: DataFrame,table_config):
    table_config = state_config
    kafka_df.writeStream \
    .format('kafka') \
    .foreachBatch(join_kafka_streams_denorm) \
    .option('checkpointLocation', table_config['checkpoint_location']) \
    .start() \
    .awaitTermination()

def join_kafka_streams_denorm(kafka_df, batch_id):
    try:
        table_config = state_config
        kafka_config = kafkaconfig

        filters = ata_filter(kafka_df=kafka_df)
        main_df = spark.sql(f'select * from db.table where {filters}')

        joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
        push_to_kafka(joined_df, kafka_config, table_config, 'state')
    except Exception as error:
        print(f'Join failed with the exception: {error}')
        traceback.print_exc()
        print('Stopping the application')
        sys.exit(1)
Run Code Online (Sandbox Code Playgroud)

该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。

table_config = state_config
Run Code Online (Sandbox Code Playgroud)

这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config …

apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1229
查看次数

使用Checkpoint进行Spark流式传输

我是一个引发流媒体的初学者.因此对检查点有一个基本的疑问.我的用例是按天计算唯一用户的数量.我正在使用按键和窗口缩小.我的窗口持续时间为24小时,滑动持续时间为5分钟.我正在将处理过的记录更新为mongodb.目前我每次都会更换现有记录.但我看到记忆力随着时间的推移逐渐增加,并在1小时半后杀死这个过程(在小实例中).重新启动后DB写入清除所有旧数据.所以我理解检查点就是解决方案.但我怀疑的是

  • 我的检查点持续时间应该是多少?根据文档,它说5-10倍的幻灯片持续时间.但我需要一整天的数据.所以可以保持24小时.
  • 理想情况下,检查站应该在哪里?最初当我收到流时或在窗口操作之前或数据缩减发生之后.

  • 感谢您的帮助.
    谢谢

    apache-spark spark-streaming

    0
    推荐指数
    1
    解决办法
    2374
    查看次数

    scala.MatchError:在Dataframes中

    我有一个Spark (version 1.3.1)申请.其中,我试图将一个转换Java bean RDD JavaRDD<Message>为Dataframe,它有许多具有不同数据类型的字段(整数,字符串,列表,映射,双精度).

    但是,当我执行我的代码时.

    messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
                @Override
                public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
                    SQLContext sqlContext = SparkConnection.getSqlContext();
                    DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
                    df.registerTempTable("messages");
    
    Run Code Online (Sandbox Code Playgroud)

    我收到了这个错误

    /06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
    15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
    scala.MatchError: interface java.util.List (of class java.lang.Class)
        at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193)
        at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) …
    Run Code Online (Sandbox Code Playgroud)

    java scala apache-spark spark-streaming apache-spark-sql

    0
    推荐指数
    1
    解决办法
    3673
    查看次数

    Spark:没有注册输出操作,因此无需执行任何操作

    下面的问题是类似的:使用Twitter的Spark Streaming - 没有注册输出流,所以没有什么可以执行,但我认为在线51使用wordCounts.print()我实际上输出了一些结果.

    基本代码:

    ssc.start()
          ssc.awaitTermination()
          val lines = messages.map(_._2)
          val words = lines.flatMap(_.split(" "))
          val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
          wordCounts.print()
    
    Run Code Online (Sandbox Code Playgroud)

    或者我在这里误解了什么?要跟进:https: //github.com/dataplayground/playground/blob/master/app/actors/DirectStreamingActor.scala

    apache-spark spark-streaming

    0
    推荐指数
    1
    解决办法
    2285
    查看次数

    如何将查找(广播)RDD(或数据集)访问到其他RDD映射函数中

    我是spark和scala的新手,刚开始学习......我在CDH 5.1.3上使用spark 1.0.0

    我得到了一个名为dbTableKeyValueMap的广播rdd:RDD [(String,String)],我想使用dbTableKeyValueMap来处理我的文件RDD(每行有300多列).这是代码:

    val get = fileRDD.map({x =>
      val tmp = dbTableKeyValueMap.lookup(x)
      tmp
    })
    
    Run Code Online (Sandbox Code Playgroud)

    在本地运行此挂起和/或在一段时间后出错:

    scala.MatchError: null
    at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)
    
    Run Code Online (Sandbox Code Playgroud)

    我可以理解访问一个RDD里面其他会有问题,如果集合的地点和大小进入图片..对于我采取笛卡尔产品不是选项,因为文件RDD中的记录是巨大的(每行有300+列)...就像我使用分布式缓存在setup方法中加载这个dbTableKeyValueMap并在hadoop java mapreduce代码的MAP中使用,我想在spark map中使用类似的方式...我找不到简单的例子来引用类似的用例...一个我想迭代文件RDD行并在"每一列"上进行一些转换,祝福,查找等以进行进一步处理......或者还有其他任何方式我可以使用dbTableKeyValueMap作为scala集合而不是spark RDD

    请帮忙

    scala broadcasting apache-spark spark-streaming rdd

    0
    推荐指数
    1
    解决办法
    7872
    查看次数

    在Spark Streaming中,有没有办法检测批处理何时完成?

    我使用Spark 1.6.0和Cloudera 5.8.3.
    我有一个DStream对象,并在其上定义了大量的转换,

    val stream = KafkaUtils.createDirectStream[...](...)
    val mappedStream = stream.transform { ... }.map { ... }
    mappedStream.foreachRDD { ... }
    mappedStream.foreachRDD { ... }
    mappedStream.map { ... }.foreachRDD { ... }
    
    Run Code Online (Sandbox Code Playgroud)

    有没有办法注册foreachRDD保证最后执行的最后一个并且只有在上面的foreachRDDs完成执行时?
    换句话说,当Spark UI显示作业已完成时 - 就在我想要执行轻量级函数时.

    API中是否有允许我实现的内容?

    谢谢

    scala cloudera apache-spark spark-streaming

    0
    推荐指数
    1
    解决办法
    1938
    查看次数

    带Kafka连接器的Spark流式传输停止

    我开始使用Spark流媒体.我想从Kafka获取一个流,其中包含我在Spark文档中找到的示例代码:https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html

    这是我的代码:

    object SparkStreaming {
    
      def main(args: Array[String]) {
    
        val conf = new SparkConf().setAppName("Test_kafka_spark").setMaster("local[*]") // local parallelism 1
        val ssc = new StreamingContext(conf, Seconds(1))
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9093",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "test",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("spark")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
    
        stream.map(record => (record.key, record.value))
    
      }
    }
    
    Run Code Online (Sandbox Code Playgroud)

    所有人似乎都开始很好,但工作立即停止,记录如下:

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties …
    Run Code Online (Sandbox Code Playgroud)

    scala apache-kafka spark-streaming spark-streaming-kafka

    0
    推荐指数
    1
    解决办法
    479
    查看次数

    Spark SQL删除空格

    我有一个简单的Spark程序,它读取JSON文件并发出CSV文件.在JSON数据中,值包含前导和尾随空格,当我发出CSV时,前导和尾随空格都消失了.有没有办法可以保留空间.我尝试了很多选项,如ignoreTrailingWhiteSpace,ignoreLeadingWhiteSpace,但没有运气

    input.json

    {"key" : "k1", "value1": "Good String", "value2": "Good String"}
    {"key" : "k1", "value1": "With Spaces      ", "value2": "With Spaces      "}
    {"key" : "k1", "value1": "with tab\t", "value2": "with tab\t"}
    
    Run Code Online (Sandbox Code Playgroud)

    output.csv

    _corrupt_record,key,value1,value2
    ,k1,Good String,Good String
    ,k1,With Spaces,With Spaces
    ,k1,with tab,with tab
    
    Run Code Online (Sandbox Code Playgroud)

    expected.csv

    _corrupt_record,key,value1,value2
    ,k1,Good String,Good String
    ,k1,With Spaces      ,With Spaces      
    ,k1,with tab\t,with tab\t
    
    Run Code Online (Sandbox Code Playgroud)

    我的代码:

    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession
                .builder()
                .appName(TestSpark.class.getName())
                .master("local[1]").getOrCreate();
    
        SparkContext context = sparkSession.sparkContext();
        context.setLogLevel("ERROR");
        SQLContext sqlCtx = sparkSession.sqlContext();
        System.out.println("Spark context established"); …
    Run Code Online (Sandbox Code Playgroud)

    apache-spark spark-streaming apache-spark-sql spark-dataframe apache-spark-mllib

    0
    推荐指数
    1
    解决办法
    5046
    查看次数