我想从Spark RDD中的特定分区访问数据.我可以得到一个分区的地址如下:
myRDD.partitions(0)
Run Code Online (Sandbox Code Playgroud)
但我想从myRDD.partitions(0)分区获取数据.我试过官方的org.apache.spark文档但找不到.
提前致谢.
我在根队列下有4个队列,配置如下。
|-------------|-----------------|---------------------|-------------------|
| Queue Name | Capacity (in %) | Max Capacity (in %) | User Limit Factor |
|-------------|-----------------|---------------------|-------------------|
| default | 10 | 30 | 10 |
|-------------|-----------------|---------------------|-------------------|
| thriftsvr | 5 | 30 | 10 |
|-------------|-----------------|---------------------|-------------------|
| stream | 70 | 70 | 10 |
|-------------|-----------------|---------------------|-------------------|
| batch | 15 | 30 | 10 |
|-------------|-----------------|---------------------|-------------------|
Run Code Online (Sandbox Code Playgroud)
我通过yarn.scheduler.capacity.root..capacity设置了容量,并通过yarn.scheduler.capacity.root..maximum-capacity属性设置了最大容量。
我的理解是,以上两个属性设置了绝对容量和绝对最大容量,这意味着队列流的100%等于集群总容量的70%,并且可以填充队列总容量的100%,也就是集群总容量的70%容量。
现在,问题是当队列“流”中充满了66.4%(即,已用容量:66.4%和绝对已用容量:46.5%)时,新作业进入挂起状态,该状态是在队列“流”中通过说“等待AM容器被分配,启动并在RM中注册”。
当我在yarn UI上检查队列配置时,它显示已配置的最大容量:70.0%和绝对已配置的最大容量:70.0%,但是根据配置,可以填充队列“流”直到已用容量:100%和绝对已用容量:70%

知道为什么为什么新作业直到100%都无法利用队列流的容量?
我正在探索 Google Cloud 上的日志记录、监控和警报选项。我发现 Cloud Logging、Monitoring 和 Stackdriver 作为几个选项。
从理论上讲,这些服务看起来很相似。谁能解释一下这些服务之间的实际区别是什么?
谢谢你。
logging monitoring google-cloud-platform google-cloud-monitoring
我正面临与kafka火花流相关的问题,我的用例如下:
现在,问题是在流式传输期间我们正在丢失一些消息,即所有传入的消息都不会写入协调或非协调的主题.例如,如果应用程序在一个批处理中收到30条消息,那么有时它会将所有消息写入输出主题(这是预期的行为),但有时它只写入27条消息(3条消息丢失,此数字可能会改变).
版本如下:
Kafka主题配置如下:
以下是我们用于kafka的属性:
val props = new Properties()
props.put("metadata.broker.list", properties.getProperty("metadataBrokerList"))
props.put("auto.offset.reset", properties.getProperty("autoOffsetReset"))
props.put("group.id", properties.getProperty("group.id"))
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized"))
props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized"))
props.put("acks", "all");
props.put("retries", "5");
props.put("request.required.acks", "-1")
Run Code Online (Sandbox Code Playgroud)
以下是我们将处理过的消息写入kafka的代码片段:val schemaRdd2 = finalHarmonizedDF.toJSON
schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
partition.foreach { row =>
if (debug) println(row.mkString)
val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"),
null, row.toString())
producer.send(keyedMessage)
}
//hack, should be done with the flush
Thread.sleep(1000) …Run Code Online (Sandbox Code Playgroud) 我使用 saveAsTextFile 方法以文本格式将 RDD 写入本地文件系统。在输出目录中,即使是 _SUCCESS 文件,每个零件文件也有一个 .crc 文件。
我只是在寻找避免生成这些元文件(尤其是 .crc)的 Hadoop/Spark 的任何内置功能或属性
我发现以下属性可以避免为 parquet 文件生成 _SUCCESS 文件和 .crc 文件,但为文本文件寻找类似的属性。
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Run Code Online (Sandbox Code Playgroud)
提前致谢。
apache-spark ×3
apache-kafka ×1
crc ×1
hadoop ×1
hadoop-yarn ×1
logging ×1
monitoring ×1
rdd ×1