小编Vik*_*eek的帖子

如何从Spark RDD中的特定分区获取数据?

我想从Spark RDD中的特定分区访问数据.我可以得到一个分区的地址如下:

myRDD.partitions(0)
Run Code Online (Sandbox Code Playgroud)

但我想从myRDD.partitions(0)分区获取数据.我试过官方的org.apache.spark文档但找不到.

提前致谢.

apache-spark rdd

6
推荐指数
1
解决办法
4540
查看次数

纱线-yarn.scheduler.capacity.root.queue-name.maximum-capacity如何工作?

我在根队列下有4个队列,配置如下。

|-------------|-----------------|---------------------|-------------------|
| Queue Name  | Capacity (in %) | Max Capacity (in %) | User Limit Factor |
|-------------|-----------------|---------------------|-------------------|
| default     | 10              | 30                  | 10                |
|-------------|-----------------|---------------------|-------------------|
| thriftsvr   | 5               | 30                  | 10                |
|-------------|-----------------|---------------------|-------------------|
| stream      | 70              | 70                  | 10                |
|-------------|-----------------|---------------------|-------------------|
| batch       | 15              | 30                  | 10                |
|-------------|-----------------|---------------------|-------------------|
Run Code Online (Sandbox Code Playgroud)

我通过yarn.scheduler.capacity.root..capacity设置了容量,并通过yarn.scheduler.capacity.root..maximum-capacity属性设置了最大容量。

我的理解是,以上两个属性设置了绝对容量和绝对最大容量,这意味着队列流的100%等于集群总容量的70%,并且可以填充队列总容量的100%,也就是集群总容量的70%容量。

现在,问题是当队列“流”中充满了66.4%(即,已用容量:66.4%和绝对已用容量:46.5%)时,新作业进入挂起状态,该状态是在队列“流”中通过说“等待AM容器被分配,启动并在RM中注册”。

当我在yarn UI上检查队列配置时,它显示已配置的最大容量:70.0%和绝对已配置的最大容量:70.0%,但是根据配置,可以填充队列“流”直到已用容量:100%和绝对已用容量:70% 在此处输入图片说明

知道为什么为什么新作业直到100%都无法利用队列流的容量?

capacity-planning hadoop-yarn

6
推荐指数
1
解决办法
117
查看次数

Google Cloud - Cloud Logging、Cloud Monitoring 和 Stackdriver 之间有什么区别?

我正在探索 Google Cloud 上的日志记录、监控和警报选项。我发现 Cloud Logging、Monitoring 和 Stackdriver 作为几个选项。

从理论上讲,这些服务看起来很相似。谁能解释一下这些服务之间的实际区别是什么?

谢谢你。

logging monitoring google-cloud-platform google-cloud-monitoring

6
推荐指数
1
解决办法
2143
查看次数

消息在Kafka + Spark Streaming中丢失

我正面临与kafka火花流相关的问题,我的用例如下:

  • Spark流(DirectStream)应用程序从Kafka主题读取消息并对其进行处理.
  • 在处理过的消息的基础上,应用程序将处理后的消息写入不同的Kafka主题,例如,如果消息是协调的,则写入协调的主题其他未协调的主题.

现在,问题是在流式传输期间我们正在丢失一些消息,即所有传入的消息都不会写入协调或非协调的主题.例如,如果应用程序在一个批处理中收到30条消息,那么有时它会将所有消息写入输出主题(这是预期的行为),但有时它只写入27条消息(3条消息丢失,此数字可能会改变).

版本如下:

  • Spark 1.6.0
  • 卡夫卡0.9

Kafka主题配置如下:

  • 经纪人数:3
  • num复制因子:3
  • 分区数量:3

以下是我们用于kafka的属性:

      val props = new Properties() 
      props.put("metadata.broker.list", properties.getProperty("metadataBrokerList")) 
      props.put("auto.offset.reset", properties.getProperty("autoOffsetReset")) 
      props.put("group.id", properties.getProperty("group.id")) 
      props.put("serializer.class", "kafka.serializer.StringEncoder") 
      props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized")) 
      props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized")) 
      props.put("acks", "all"); 
      props.put("retries", "5"); 
      props.put("request.required.acks", "-1") 
Run Code Online (Sandbox Code Playgroud)

以下是我们将处理过的消息写入kafka的代码片段:val schemaRdd2 = finalHarmonizedDF.toJSON

      schemaRdd2.foreachPartition { partition => 
        val producerConfig = new ProducerConfig(props) 
        val producer = new Producer[String, String](producerConfig) 

        partition.foreach { row => 
          if (debug) println(row.mkString) 
          val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"), 
            null, row.toString()) 
          producer.send(keyedMessage) 

        } 
        //hack, should be done with the flush 
        Thread.sleep(1000) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark

5
推荐指数
1
解决办法
351
查看次数

如何避免在 Spark 中为文本文件生成元文件(.crc 文件)?

我使用 saveAsTextFile 方法以文本格式将 RDD 写入本地文件系统。在输出目录中,即使是 _SUCCESS 文件,每个零件文件也有一个 .crc 文件。

我只是在寻找避免生成这些元文件(尤其是 .crc)的 Hadoop/Spark 的任何内置功能或属性

我发现以下属性可以避免为 parquet 文件生成 _SUCCESS 文件和 .crc 文件,但为文本文件寻找类似的属性。

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
Run Code Online (Sandbox Code Playgroud)

提前致谢。

hadoop crc apache-spark

5
推荐指数
0
解决办法
3858
查看次数