标签: rdd

如何在Spark的RDD中获取元素位置?

我是Apache Spark的新手,我知道核心数据结构是RDD.现在我正在编写一些需要元素位置信息的应用程序.例如,在将ArrayList转换为(Java)RDD之后,对于RDD中的每个整数,我需要知道它的(全局)数组下标.有可能吗?

据我所知,RDD 有一个take(int)函数,所以我相信位置信息仍然保留在RDD中.

position apache-spark rdd

12
推荐指数
2
解决办法
2万
查看次数

如何将RDD保存到HDFS中以后再读回?

我有一个RDD,其元素是类型(长,字符串).出于某种原因,我想将整个RDD保存到HDFS中,稍后还会在Spark程序中读取该RDD.有可能吗?如果是这样,怎么样?

scala bigdata hdfs apache-spark rdd

12
推荐指数
2
解决办法
2万
查看次数

了解Spark中的shuffle管理器

让我帮助澄清一下深度洗牌以及Spark如何使用随机播放管理器.我报告了一些非常有用的资源:

https://trongkhoanguyenblog.wordpress.com/

https://0x0fff.com/spark-architecture-shuffle/

https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md

读它们,我知道有不同的洗牌经理.我想关注其中两个:hash managersort manager(这是默认的管理器).

为了揭露我的问题,我想从一个非常普遍的转变开始:

val rdd = reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)

此转换导致映射端聚合,然后随机播放以将所有相同的密钥带入同一分区.

我的问题是:

  • Map-Side聚合是使用内部mapPartition转换实现的,因此使用组合器函数聚合所有相同的键,还是使用AppendOnlyMapExternalAppendOnlyMap

  • 如果AppendOnlyMap或者ExternalAppendOnlyMap地图用于聚合,它们是否也用于减少发生在 ResultTask

  • 这两种地图(AppendOnlyMapExternalAppendOnlyMap)的目的是什么?

  • AppendOnlyMapExternalAppendOnlyMap所有洗牌经理,或只是从sortManager使用?

  • 我读到之后AppendOnlyMapExternalAppendOnlyMap已经满了,都被泄漏到文件中,这个步骤到底是怎么回事?

  • 使用Sort shuffle管理器,我们使用appendOnlyMap来聚合和组合分区记录,对吧?然后当执行内存填满时,我们开始排序映射,将其溢出到磁盘然后清理映射,我的问题是:溢出到磁盘和shuffle写入有什么区别?它们主要包括在本地文件系统上创建文件,但它们的处理方式不同,Shuffle写入记录,不会放入appendOnlyMap.

  • 你能否深入解释当reduceByKey被执行时会发生什么,向我解释完成该任务所涉及的所有步骤?例如,地图边聚合,改组等所有步骤.

partitioning shuffle apache-spark rdd

12
推荐指数
1
解决办法
1817
查看次数

Spark错误:没有足够的空间来缓存内存中的分区rdd_8_2!可用内存为58905314字节

当我使用它的示例代码BinaryClassification.scala用我自己的数据运行星火作业时,它总是显示类似错误"内存中没有足够的空间来缓存分区rdd_8_2!免费的内存是58905314个字节."

我通过CONF =新SparkConf()设置内存到4G.setAppName(S "BinaryClassification与$ PARAMS")集( "spark.executor.memory", "4G"),这是行不通的.有没有人有任何想法?谢谢:)

我在带有16GB内存的Macbook Pro上本地运行它.

bin/spark-submit --class BinaryClassification ~/dev/scalaworkspace/BinaryClassification/BinaryClassification_fat.jar ~/data/trajectory.libsvm --algorithm LR

Spark assembly has been built with Hive, including Datanucleus jars on classpath
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/11/22 17:07:24 INFO SecurityManager: Changing view acls to: wangchao,
14/11/22 17:07:24 INFO SecurityManager: Changing modify acls to: wangchao,
14/11/22 17:07:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: …
Run Code Online (Sandbox Code Playgroud)

scala out-of-memory apache-spark rdd

11
推荐指数
1
解决办法
1万
查看次数

Apache Spark:使用RDD.aggregateByKey()的RDD.groupByKey()的等效实现是什么?

Apache Spark pyspark.RDDAPI文档提到groupByKey()效率低下.相反,它是推荐使用reduceByKey(),aggregateByKey(),combineByKey(),或foldByKey()代替.这将导致在shuffle之前在worker中进行一些聚合,从而减少跨工作人员的数据混乱.

给定以下数据集和groupByKey()表达式,什么是等效且有效的实现(减少的跨工作者数据混洗),它不使用groupByKey(),但提供相同的结果?

dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
       .groupByKey())
print sorted(rdd.mapValues(list).collect())
Run Code Online (Sandbox Code Playgroud)

输出:

[('a', [7, 8]), ('b', [3])]
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark

11
推荐指数
1
解决办法
8486
查看次数

Spark:reduce和reduceByKey之间的语义差异

在Spark的文档中,它说RDDs方法reduce需要一个关联的AND可交换二进制函数.

但是,该方法reduceByKey仅需要关联二进制函数.

sc.textFile("file4kB", 4)
Run Code Online (Sandbox Code Playgroud)

我做了一些测试,显然这是我得到的行为.为何如此区别?为什么reduceByKey确保二进制函数总是以某种顺序应用(以适应缺乏可交换性)何时reduce不?

例如,如果加载一些(小)文本有4个分区(最小):

val r = sc.textFile("file4k", 4)
Run Code Online (Sandbox Code Playgroud)

然后:

r.reduce(_ + _)
Run Code Online (Sandbox Code Playgroud)

返回一个字符串,其中的部分并不总是以相同的顺序,而:

r.map(x => (1,x)).reduceByKey(_ + _).first
Run Code Online (Sandbox Code Playgroud)

始终返回相同的字符串(其中所有内容的顺序与原始文件中的顺序相同).

(我检查过r.glom,文件内容确实分布在4个分区,没有空分区).

reduce scala apache-spark rdd

11
推荐指数
1
解决办法
5940
查看次数

pyspark使用partitionby分区数据

我知道该partitionBy功能会分区我的数据.如果我使用rdd.partitionBy(100)它将按键将我的数据分成100个部分.即,与类似键相关联的数据将被组合在一起

  1. 我的理解是否正确?
  2. 是否建议将分区数等于可用内核数?这会使处理更有效吗?
  3. 如果我的数据不是键值格式怎么办?我还能使用这个功能吗?
  4. 假设我的数据是serial_number_of_student,student_name.在这种情况下,我可以通过student_name而不是serial_number对我的数据进行分区吗?

python partitioning apache-spark rdd pyspark

11
推荐指数
1
解决办法
2万
查看次数

Spark Caching:RDD只缓存了8%

对于我的代码段如下:

val levelsFile = sc.textFile(levelsFilePath)
val levelsSplitedFile = levelsFile.map(line => line.split(fileDelimiter, -1))
val levelPairRddtemp = levelsSplitedFile
                              .filter(linearr => ( linearr(pogIndex).length!=0))
                              .map(linearr => (linearr(pogIndex).toLong, levelsIndexes.map(x => linearr(x))
                              .filter(value => (!value.equalsIgnoreCase("") && !value.equalsIgnoreCase(" ") && !value.equalsIgnoreCase("null")))))
                              .mapValues(value => value.mkString(","))
                              .partitionBy(new HashPartitioner(24))
                              .persist(StorageLevel.MEMORY_ONLY_SER)

levelPairRddtemp.count // just to trigger rdd creation
Run Code Online (Sandbox Code Playgroud)

信息

  1. 文件的大小是~4G
  2. 我使用2 executors(每个5G)和12个核心.
  3. Spark 版本:1.5.2

问题

当我看到它SparkUIStorage tab,我看到的是:

在此输入图像描述

在里面RDD看来,24个partitions中只有2个被缓存.

在此输入图像描述

对此行为的任何解释,以及如何解决此问题.

编辑1:我刚尝试使用60个分区HashPartitioner作为:

..
.partitionBy(new HashPartitioner(60))
..
Run Code Online (Sandbox Code Playgroud)

工作了.现在我得到了整个RDD缓存.有什么猜测这里可能发生了什么?数据偏差是否会导致此行为? …

memory-management scala distributed-computing apache-spark rdd

11
推荐指数
1
解决办法
375
查看次数

清除Spark RDD中保存的数据中的无效字符

我有一个从JSON文件导入的PySpark RDD.数据元素包含许多具有不可取字符的值.为了参数,只有那些string.printable的字符应该在那些JSON文件中.

鉴于存在大量包含文本信息的元素,我一直在尝试找到一种方法,将传入的RDD映射到一个函数来清理数据并返回一个清理的RDD作为输出.我可以找到从RDD打印清理元素的方法,但不是整个元素集合,然后返回RDD.

示例文档可能如下所示,不良字符可能会蔓延到userAgent,marketingReference和pageTags元素或任何文本元素中.

{
    "documentId": "abcdef12-1234-5678-fedc-cba9876543210",
    "documentType": "contentSummary",
    "dateTimeCreated": "2017-01-01T03:00:22.478Z"
    "body": {
        "requestUrl": "http://www.our-web-site.com/en-gb/line-of-business/product-category/irritating-guid/",
        "requestMethod": "GET",
        "responseCode": "200",
        "userAgent": "Mozilla/5.0 etc",
        "requestHeaders": {
            "connection": "close",
            "host": "www.our-web-site.com",
            "accept-language": "en-gb",
            "via": "1.1 www.our-web-site.com",
            "user-agent": "Mozilla/5.0 etc",
            "x-forwarded-proto": "https",
            "clientIp": "99.99.99.99",
            "referer": "http://www.our-web-site.com/en-gb/line-of-business/product-category/irritating-guid/",
            "accept-encoding": "gzip, deflate",
            "incap-client-ip": "99.99.99.99"
        },
        "body": {
            "pageId": "/content/our-web-site/en-gb/holidays/interstitial",
            "pageVersion": "1.0",

            "pageClassification": "product-page",
            "pageTags": "spark, python, rdd, other words",
            "MarketingReference": "BUYMEPLEASE",
            "referrer": "http://www.our-web-site.com/en-gb/line-of-business/product-category/irritating-guid/",
            "webSessionId": "abcdef12-1234-5678-fedc-cba9876543210"
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

python-3.x apache-spark rdd pyspark

11
推荐指数
1
解决办法
438
查看次数

如何根据Spark中的日期时间值过滤数据集

我正在尝试根据日期时间字段过滤数据。我的数据样本:

303,0.00001747,4351040,75.9054,"2019-03-08 19:29:18"
Run Code Online (Sandbox Code Playgroud)

这是我初始化spark的方式:

    SparkConf conf = new SparkConf().setAppName("app name").setMaster("spark://192.168.1.124:7077");
    JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));
Run Code Online (Sandbox Code Playgroud)

首先,我将上面的数据读入我的自定义对象中,如下所示:

    // Read data from file into custom object
    JavaRDD<CurrencyPair> rdd = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
        new Function<String, CurrencyPair>() {
            public CurrencyPair call(String line) throws Exception {
                String[] fields = line.split(","); // Split line from commas

                // read each data into custom object
                CurrencyPair cp = new CurrencyPair();
                cp.setId(Integer.parseInt(fields[0].trim()));
                cp.setValue(Double.parseDouble(fields[1].trim()));
                cp.setBaseVolume(Double.parseDouble(fields[2].trim()));
                cp.setQuoteVolume(Double.parseDouble(fields[3].trim()));
                cp.setTimeStamp(new Date(fields[4].trim()));

                System.out.println("Date:" + fields[4].trim()); // To see if it will print or not …
Run Code Online (Sandbox Code Playgroud)

java hdfs apache-spark rdd

11
推荐指数
1
解决办法
275
查看次数