我有一个用Java编写并使用Spark 2.1的Spark流媒体应用程序.我正在使用KafkaUtils.createDirectStream
来自Kafka的消息.我正在使用kryo编码器/解码器用于kafka消息.我在Kafka properties-> key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这一点.
当Spark以微批处理方式提取消息时,使用kryo解码器成功解码消息.但是我注意到Spark执行器创建了一个kryo解码器的新实例,用于解码从kafka读取的每条消息.我通过将日志放入解码器构造函数中
来检查这个.这对我来说似乎很奇怪.不应该为每个消息和每个批次使用相同的解码器实例吗?
我在卡夫卡读书的代码:
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
Run Code Online (Sandbox Code Playgroud) 我有一个大约 1 GB 的镶木地板文件。每个数据记录都是来自 IOT 设备的读数,它捕获设备在过去一分钟内消耗的能量。架构:houseId、deviceId、energy 镶木地板文件根据houseId 和deviceId 进行分区。一个文件只包含过去 24 小时的数据。
我想使用 Spark SQL 对驻留在此镶木地板文件中的数据执行一些查询示例查询找出给定房屋在过去 24 小时内每台设备消耗的平均能源。
Dataset<Row> df4 = ss.read().parquet("/readings.parquet");
df4.as(encoder).registerTempTable("deviceReadings");
ss.sql("Select avg(energy) from deviceReadings where houseId=3123).show();
Run Code Online (Sandbox Code Playgroud)
上面的代码运行良好。我想了解 spark 如何执行此查询。
我想实时处理在Web服务器上报告的消息。Web服务器上报告的消息属于不同的会话,我想进行一些会话级别的聚合。为此,我计划使用Kafka的Spark Streaming前端。甚至在我开始之前,我就列出了该体系结构将面临的一些挑战。熟悉这个生态系统的人可以帮助我解决以下问题:
我使用的是在独立模式下配置的 Spark 2.2.0 集群。Cluster有2个八核机器。该集群专用于 Spark 作业,没有其他进程使用它们。我有大约 8 个 Spark Streaming 应用程序在这个集群上运行。
我将 SPARK_WORKER_CORES(在 spark-env.sh 中)明确设置为 8,并使用 total-executor-cores 设置为每个应用程序分配一个内核。此配置降低了在多个任务上并行工作的能力。如果一个 stage 在一个有 200 个分区的分区 RDD 上工作,则一次只执行一个任务。我想让 Spark 做的是为每个作业启动单独的线程并并行处理。但是我找不到单独的 Spark 设置来控制线程数。
因此,我决定尝试将每台机器上的内核数量(即 spark-env.sh 中的 SPARK_WORKER_CORES)增加到 1000。然后我为每个 Spark 应用程序提供了 100 个内核。我发现 spark 这次开始并行处理 100 个分区,表明正在使用 100 个线程。
我不确定这是否是影响 Spark 作业使用的线程数的正确方法。
streaming distributed-computing apache-spark apache-spark-standalone
apache-spark ×4
apache-kafka ×2
bigdata ×1
java ×1
kryo ×1
parquet ×1
partitioning ×1
streaming ×1