我想查看如何获取有关每个分区的信息,例如总数.当使用部署模式作为纱线群集提交Spark作业以便在控制台上记录或打印时,驱动程序端的每个分区中的记录数.
我正在使用DirectAPI在纱线上运行火花流(1.6.1)来读取具有50个分区并在HDFS上写入的Kafka主题的事件.我的批处理间隔为60秒.我收到了大约500K的消息,这些消息在60秒内被处理.
突然火花开始接收15-20万条消息,大约需要5-6分钟处理,批处理间隔为60秒.我已经配置好了"spark.streaming.concurrentJobs=4".
因此,当批处理需要很长时间来处理时,spark会启动并发4个活动任务来处理积压批处理但仍然会在一段时间内批量积压增加,因为批处理间隔对于这样的数据量来说太小了.
我对此几乎没有疑问.
当我开始接收15-20万条消息和处理这些消息的时间大约是5-6分钟,批处理间隔为60秒.当我检查我的HDFS目录时,我看到为每个60秒创建的文件包含50个部分文件,我很困惑,这里我的批处理在5-6分钟内得到处理,然后如何每隔1分钟在HDFS上写文件&'saveAsTextFile'动作是每批只调用一次.所有文件的总记录50个零件文件大约有330万个.
为了处理1500万到2000万条消息的处理,我将批处理间隔配置为8-10分钟,现在火花开始消耗来自Kafka的大约35-40万条消息,并且其处理时间再次开始超过批处理间隔.
我已配置'spark.streaming.kafka.maxRatePerPartition=50'&'spark.streaming.backpressure.enabled=true'.
hadoop apache-kafka apache-spark spark-streaming kafka-consumer-api
为了在我的项目中使用结构化流,我正在 hortonworks 2.6.3 环境上测试 Spark 2.2.0 和 Kafka 0.10.1 与 Kerberos 的集成,我正在运行下面的示例代码来检查集成。我能够在 Spark 本地模式下的 IntelliJ 上运行以下程序,没有任何问题,但是当在 Hadoop 集群上移动到纱线集群/客户端模式时,相同的程序会抛出以下异常。
我知道我可以为 group-id 配置 kafka acl,但是 Spark 结构化流为每个查询生成新的 group-id,因此我无法在 kafka acl 中配置 group-id 以摆脱授权异常。我现在有点卡住了。
14:19:59 org.apache.spark.sql.streaming.StreamingQueryException: Not authorized to access group: spark-kafka-source-632450e3-a111-4d09-8704-85320c572aeb--1213729126-driver-2
例外:
18/01/31 14:46:34 INFO AbstractLogin: Successfully logged in.
18/01/31 14:46:34 INFO KerberosLogin: TGT refresh thread started.
18/01/31 14:46:34 INFO KerberosLogin: TGT valid starting at: Wed Jan 31 13:51:11 UTC 2018
18/01/31 14:46:34 INFO KerberosLogin: TGT expires: Wed Jan 31 23:51:14 UTC …Run Code Online (Sandbox Code Playgroud) hadoop kerberos apache-kafka apache-spark kafka-consumer-api
我正在尝试对 Spark 结构化流数据帧进行非常简单的排序操作,但最终出现“线程“主”org.apache.spark.sql.AnalysisException 中的异常:流数据帧/数据集不支持排序,除非它在聚合数据帧/上”完整输出模式下的数据集”,但有以下例外。你能帮我解决这个问题吗?
代码:
val df: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokerList)
.option("kafka.security.protocol", security)
.option("startingOffsets", "latest")
.option("subscribe", srcTopic)
.option("group.id", groupID)
.option("failOnDataLoss", false)
.load
val uDF = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
.select($"value")
.select(from_json($"value", uSchema).as("events"))
.select($"events.*")
val uDF2 = uDF
.select($"COL1", $"COL2", $"COL3", $"COL4", $"COL5", $"COL6", $"COL7", $"COL8")
.sort($"COL5",$"COL3",$"COL8")
val kDF = uDF2
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("kafka.security.protocol", "PLAINTEXT")
.option("topic", "r_topic")
.option("checkpointLocation", "/tmp/kafka-sink-checkpoint")
.start()
kDF.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
例外:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Sorting is not supported on …Run Code Online (Sandbox Code Playgroud) 我想了解火花流的基本内容.我有50个Kafka主题分区和5个执行程序,我使用DirectAPI所以没有.RDD分区将为50.如何在5个执行程序上处理此分区?将在每个执行程序上一次激活进程1分区,或者如果执行程序有足够的内存和核心,它将在每个执行程序上并行处理多个分区.
我有一个用例,从卡夫卡读取消息后,我需要从Spark Streaming调用RESTAPI进行一些计算,然后将结果保存回HDFS和第三方应用程序。
我对此毫不怀疑:
我想要了解当消息被写入具有多个分区的主题时Kafka如何维护消息序列的任何信息/解释.例如,我有多个消息生成器,每个消息生成器按顺序生成消息,并在具有多个分区的Kafka主题上写入.在这种情况下,消费者组将如何使用消息.
在我目前的用例中,我使用Spark核心从MS SQL Server读取数据并对数据进行一些处理并每隔1分钟将其发送到Kafka,我使用Spark和Phoenix来维护HBase表中的CDC信息.
但是这种设计存在一些问题,例如,如果MS SQL记录激增,Spark处理比批处理间隔花费更多时间,并且spark最终会向Kafka发送重复记录.
作为替代方案,我正在考虑使用Kafka Connect从MS SQL读取消息并将记录发送到Kafka主题并在Kafka中维护MS SQL CDC.Spark Streaming将从Kafka主题中读取记录,并将记录和存储处理到HBase并发送到其他Kafka主题.
我有几个问题要实现这个架构:
我可以使用开源Kafka连接器和Apache Kafka 0.9版本来实现这种架构.
如果是的话,请你推荐一个GitHub项目,它可以为我提供这样的连接器,我可以使用SQL查询CDC MS SQL表SELECT * FROM SOMETHING WHERE COLUMN > ${lastExtractUnixTime}),并将记录存储到Kafka主题中.
Kafka connect是否支持Kerberos Kafka设置.
apache-spark ×7
apache-kafka ×5
hadoop ×4
scala ×3
kerberos ×1
partitioning ×1
rest ×1
sql-server ×1