小编sco*_*pio的帖子

为什么Kafka Direct Stream会为每条消息创建一个新的解码器?

我有一个用Java编写并使用Spark 2.1的Spark流媒体应用程序.我正在使用KafkaUtils.createDirectStream来自Kafka的消息.我正在使用kryo编码器/解码器用于kafka消息.我在Kafka properties-> key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这一点.

当Spark以微批处理方式提取消息时,使用kryo解码器成功解码消息.但是我注意到Spark执行器创建了一个kryo解码器的新实例,用于解码从kafka读取的每条消息.我通过将日志放入解码器构造函数中

来检查这个.这对我来说似乎很奇怪.不应该为每个消息和每个批次使用相同的解码器实例吗?

我在卡夫卡读书的代码:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
Run Code Online (Sandbox Code Playgroud)

java kryo apache-kafka apache-spark spark-streaming

6
推荐指数
1
解决办法
366
查看次数

Spark SQL 如何读取 Parquet 分区文件

我有一个大约 1 GB 的镶木地板文件。每个数据记录都是来自 IOT 设备的读数,它捕获设备在过去一分钟内消耗的能量。架构:houseId、deviceId、energy 镶木地板文件根据houseId 和deviceId 进行分区。一个文件只包含过去 24 小时的数据。

我想使用 Spark SQL 对驻留在此镶木地板文件中的数据执行一些查询示例查询找出给定房屋在过去 24 小时内每台设备消耗的平均能源。

Dataset<Row> df4 = ss.read().parquet("/readings.parquet");
df4.as(encoder).registerTempTable("deviceReadings");
ss.sql("Select avg(energy) from deviceReadings where houseId=3123).show();
Run Code Online (Sandbox Code Playgroud)

上面的代码运行良好。我想了解 spark 如何执行此查询。

  1. Spark 是否在不查看查询的情况下从 HDFS 读取内存中的整个 Parquet 文件?(我不相信这是真的)
  2. Spark 是否仅根据查询从 HDFS 加载所需的分区?
  3. 如果有多个查询需要执行怎么办?Spark 在准备执行计划时会查看多个查询吗?一个查询可能只处理一个分区,而第二个查询可能需要所有分区,因此合并计划应将整个文件从磁盘加载到内存中(如果内存限制允许)。
  4. 如果我在上面缓存 df4 数据帧,执行时间会有所不同吗?

partitioning apache-spark parquet apache-spark-sql

4
推荐指数
1
解决办法
1383
查看次数

使用Spark Streaming处理Kafka消息时遇到的挑战

我想实时处理在Web服务器上报告的消息。Web服务器上报告的消息属于不同的会话,我想进行一些会话级别的聚合。为此,我计划使用Kafka的Spark Streaming前端。甚至在我开始之前,我就列出了该体系结构将面临的一些挑战。熟悉这个生态系统的人可以帮助我解决以下问题:

  1. 如果每个Kafka消息都属于一个特定的会话,那么如何管理会话亲缘关系,以便同一Spark执行程序可以查看链接到会话的所有消息?
  2. 如何确保属于会话的消息由Spark执行程序按在Kafka上报告的顺序进行处理?我们能以某种方式实现这一目标而又不限制线程数和增加处理开销(如按消息时间戳排序)吗?
  3. 什么时候检查点会话状态?如果执行程序节点崩溃,如何从最后一个检查点恢复状态?如果驱动程序节点崩溃,如何从最后一个检查点恢复状态?
  4. 如果节点(执行程序/驱动程序)在检查点状态之前崩溃,如何恢复状态?如果Spark通过重播消息来重新创建RDD状态,那么它从哪里开始从以下位置开始重播Kafka消息:病房的最后一个检查点,或者它处理重新创建分区所需的所有消息?Spark Streaming是否可以在多个Spark Streaming批次之间或仅在当前批次中恢复状态,即,如果在上一个批次中未执行检查点,则可以恢复状态吗?

bigdata apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1261
查看次数

SPARK_WORKER_CORES 设置如何影响 Spark Standalone 中的并发性

我使用的是在独立模式下配置的 Spark 2.2.0 集群。Cluster有2个八核机器。该集群专用于 Spark 作业,没有其他进程使用它们。我有大约 8 个 Spark Streaming 应用程序在这个集群上运行。
我将 SPARK_WORKER_CORES(在 spark-env.sh 中)明确设置为 8,并使用 total-executor-cores 设置为每个应用程序分配一个内核。此配置降低了在多个任务上并行工作的能力。如果一个 stage 在一个有 200 个分区的分区 RDD 上工作,则一次只执行一个任务。我想让 Spark 做的是为每个作业启动单独的线程并并行处理。但是我找不到单独的 Spark 设置来控制线程数。
因此,我决定尝试将每台机器上的内核数量(即 spark-env.sh 中的 SPARK_WORKER_CORES)增加到 1000。然后我为每个 Spark 应用程序提供了 100 个内核。我发现 spark 这次开始并行处理 100 个分区,表明正在使用 100 个线程。
我不确定这是否是影响 Spark 作业使用的线程数的正确方法。

streaming distributed-computing apache-spark apache-spark-standalone

1
推荐指数
1
解决办法
1469
查看次数