我是Apache Spark的新手,我知道核心数据结构是RDD.现在我正在编写一些需要元素位置信息的应用程序.例如,在将ArrayList转换为(Java)RDD之后,对于RDD中的每个整数,我需要知道它的(全局)数组下标.有可能吗?
据我所知,RDD 有一个take(int)函数,所以我相信位置信息仍然保留在RDD中.
我有一个RDD,其元素是类型(长,字符串).出于某种原因,我想将整个RDD保存到HDFS中,稍后还会在Spark程序中读取该RDD.有可能吗?如果是这样,怎么样?
让我帮助澄清一下深度洗牌以及Spark如何使用随机播放管理器.我报告了一些非常有用的资源:
https://trongkhoanguyenblog.wordpress.com/
https://0x0fff.com/spark-architecture-shuffle/
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/4-shuffleDetails.md
读它们,我知道有不同的洗牌经理.我想关注其中两个:hash manager和sort manager(这是默认的管理器).
为了揭露我的问题,我想从一个非常普遍的转变开始:
val rdd = reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
此转换导致映射端聚合,然后随机播放以将所有相同的密钥带入同一分区.
我的问题是:
Map-Side聚合是使用内部mapPartition转换实现的,因此使用组合器函数聚合所有相同的键,还是使用AppendOnlyMap或ExternalAppendOnlyMap?
如果AppendOnlyMap或者ExternalAppendOnlyMap地图用于聚合,它们是否也用于减少发生在 ResultTask?
这两种地图(AppendOnlyMap或ExternalAppendOnlyMap)的目的是什么?
是AppendOnlyMap或ExternalAppendOnlyMap所有洗牌经理,或只是从sortManager使用?
我读到之后AppendOnlyMap或ExternalAppendOnlyMap已经满了,都被泄漏到文件中,这个步骤到底是怎么回事?
使用Sort shuffle管理器,我们使用appendOnlyMap来聚合和组合分区记录,对吧?然后当执行内存填满时,我们开始排序映射,将其溢出到磁盘然后清理映射,我的问题是:溢出到磁盘和shuffle写入有什么区别?它们主要包括在本地文件系统上创建文件,但它们的处理方式不同,Shuffle写入记录,不会放入appendOnlyMap.
你能否深入解释当reduceByKey被执行时会发生什么,向我解释完成该任务所涉及的所有步骤?例如,地图边聚合,改组等所有步骤.
当我使用它的示例代码BinaryClassification.scala用我自己的数据运行星火作业时,它总是显示类似错误"内存中没有足够的空间来缓存分区rdd_8_2!免费的内存是58905314个字节."
我通过CONF =新SparkConf()设置内存到4G.setAppName(S "BinaryClassification与$ PARAMS")集( "spark.executor.memory", "4G"),这是行不通的.有没有人有任何想法?谢谢:)
我在带有16GB内存的Macbook Pro上本地运行它.
bin/spark-submit --class BinaryClassification ~/dev/scalaworkspace/BinaryClassification/BinaryClassification_fat.jar ~/data/trajectory.libsvm --algorithm LR
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/11/22 17:07:24 INFO SecurityManager: Changing view acls to: wangchao,
14/11/22 17:07:24 INFO SecurityManager: Changing modify acls to: wangchao,
14/11/22 17:07:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: …Run Code Online (Sandbox Code Playgroud) Apache Spark pyspark.RDDAPI文档提到groupByKey()效率低下.相反,它是推荐使用reduceByKey(),aggregateByKey(),combineByKey(),或foldByKey()代替.这将导致在shuffle之前在worker中进行一些聚合,从而减少跨工作人员的数据混乱.
给定以下数据集和groupByKey()表达式,什么是等效且有效的实现(减少的跨工作者数据混洗),它不使用groupByKey(),但提供相同的结果?
dataset = [("a", 7), ("b", 3), ("a", 8)]
rdd = (sc.parallelize(dataset)
.groupByKey())
print sorted(rdd.mapValues(list).collect())
Run Code Online (Sandbox Code Playgroud)
输出:
[('a', [7, 8]), ('b', [3])]
Run Code Online (Sandbox Code Playgroud) 在Spark的文档中,它说RDDs方法reduce需要一个关联的AND可交换二进制函数.
但是,该方法reduceByKey仅需要关联二进制函数.
sc.textFile("file4kB", 4)
Run Code Online (Sandbox Code Playgroud)
我做了一些测试,显然这是我得到的行为.为何如此区别?为什么reduceByKey确保二进制函数总是以某种顺序应用(以适应缺乏可交换性)何时reduce不?
例如,如果加载一些(小)文本有4个分区(最小):
val r = sc.textFile("file4k", 4)
Run Code Online (Sandbox Code Playgroud)
然后:
r.reduce(_ + _)
Run Code Online (Sandbox Code Playgroud)
返回一个字符串,其中的部分并不总是以相同的顺序,而:
r.map(x => (1,x)).reduceByKey(_ + _).first
Run Code Online (Sandbox Code Playgroud)
始终返回相同的字符串(其中所有内容的顺序与原始文件中的顺序相同).
(我检查过r.glom,文件内容确实分布在4个分区,没有空分区).
我知道该partitionBy功能会分区我的数据.如果我使用rdd.partitionBy(100)它将按键将我的数据分成100个部分.即,与类似键相关联的数据将被组合在一起
对于我的代码段如下:
val levelsFile = sc.textFile(levelsFilePath)
val levelsSplitedFile = levelsFile.map(line => line.split(fileDelimiter, -1))
val levelPairRddtemp = levelsSplitedFile
.filter(linearr => ( linearr(pogIndex).length!=0))
.map(linearr => (linearr(pogIndex).toLong, levelsIndexes.map(x => linearr(x))
.filter(value => (!value.equalsIgnoreCase("") && !value.equalsIgnoreCase(" ") && !value.equalsIgnoreCase("null")))))
.mapValues(value => value.mkString(","))
.partitionBy(new HashPartitioner(24))
.persist(StorageLevel.MEMORY_ONLY_SER)
levelPairRddtemp.count // just to trigger rdd creation
Run Code Online (Sandbox Code Playgroud)
信息
executors(每个5G)和12个核心.Spark 版本:1.5.2问题
当我看到它SparkUI时Storage tab,我看到的是:
在里面RDD看来,24个partitions中只有2个被缓存.
对此行为的任何解释,以及如何解决此问题.
编辑1:我刚尝试使用60个分区HashPartitioner作为:
..
.partitionBy(new HashPartitioner(60))
..
Run Code Online (Sandbox Code Playgroud)
它工作了.现在我得到了整个RDD缓存.有什么猜测这里可能发生了什么?数据偏差是否会导致此行为? …
memory-management scala distributed-computing apache-spark rdd
我有一个从JSON文件导入的PySpark RDD.数据元素包含许多具有不可取字符的值.为了参数,只有那些string.printable的字符应该在那些JSON文件中.
鉴于存在大量包含文本信息的元素,我一直在尝试找到一种方法,将传入的RDD映射到一个函数来清理数据并返回一个清理的RDD作为输出.我可以找到从RDD打印清理元素的方法,但不是整个元素集合,然后返回RDD.
示例文档可能如下所示,不良字符可能会蔓延到userAgent,marketingReference和pageTags元素或任何文本元素中.
{
"documentId": "abcdef12-1234-5678-fedc-cba9876543210",
"documentType": "contentSummary",
"dateTimeCreated": "2017-01-01T03:00:22.478Z"
"body": {
"requestUrl": "http://www.our-web-site.com/en-gb/line-of-business/product-category/irritating-guid/",
"requestMethod": "GET",
"responseCode": "200",
"userAgent": "Mozilla/5.0 etc",
"requestHeaders": {
"connection": "close",
"host": "www.our-web-site.com",
"accept-language": "en-gb",
"via": "1.1 www.our-web-site.com",
"user-agent": "Mozilla/5.0 etc",
"x-forwarded-proto": "https",
"clientIp": "99.99.99.99",
"referer": "http://www.our-web-site.com/en-gb/line-of-business/product-category/irritating-guid/",
"accept-encoding": "gzip, deflate",
"incap-client-ip": "99.99.99.99"
},
"body": {
"pageId": "/content/our-web-site/en-gb/holidays/interstitial",
"pageVersion": "1.0",
"pageClassification": "product-page",
"pageTags": "spark, python, rdd, other words",
"MarketingReference": "BUYMEPLEASE",
"referrer": "http://www.our-web-site.com/en-gb/line-of-business/product-category/irritating-guid/",
"webSessionId": "abcdef12-1234-5678-fedc-cba9876543210"
}
}
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试根据日期时间字段过滤数据。我的数据样本:
303,0.00001747,4351040,75.9054,"2019-03-08 19:29:18"
Run Code Online (Sandbox Code Playgroud)
这是我初始化spark的方式:
SparkConf conf = new SparkConf().setAppName("app name").setMaster("spark://192.168.1.124:7077");
JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));
Run Code Online (Sandbox Code Playgroud)
首先,我将上面的数据读入我的自定义对象中,如下所示:
// Read data from file into custom object
JavaRDD<CurrencyPair> rdd = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
new Function<String, CurrencyPair>() {
public CurrencyPair call(String line) throws Exception {
String[] fields = line.split(","); // Split line from commas
// read each data into custom object
CurrencyPair cp = new CurrencyPair();
cp.setId(Integer.parseInt(fields[0].trim()));
cp.setValue(Double.parseDouble(fields[1].trim()));
cp.setBaseVolume(Double.parseDouble(fields[2].trim()));
cp.setQuoteVolume(Double.parseDouble(fields[3].trim()));
cp.setTimeStamp(new Date(fields[4].trim()));
System.out.println("Date:" + fields[4].trim()); // To see if it will print or not …Run Code Online (Sandbox Code Playgroud) apache-spark ×10
rdd ×10
scala ×4
pyspark ×3
hdfs ×2
partitioning ×2
bigdata ×1
java ×1
position ×1
python ×1
python-3.x ×1
reduce ×1
shuffle ×1