我正在尝试对集合进行一些分析.我有一个示例数据集,如下所示:
orders.json
{"items":[1,2,3,4,5]}
{"items":[1,2,5]}
{"items":[1,3,5]}
{"items":[3,4,5]}
Run Code Online (Sandbox Code Playgroud)
它只是一个字段,它是一个代表ID的数字列表.
这是我试图运行的Spark脚本:
val sparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("Dataframe Test")
val sc = new SparkContext(sparkConf)
val sql = new SQLContext(sc)
val dataframe = sql.read.json("orders.json")
val expanded = dataframe
.explode[::[Long], Long]("items", "item1")(row => row)
.explode[::[Long], Long]("items", "item2")(row => row)
val grouped = expanded
.where(expanded("item1") !== expanded("item2"))
.groupBy("item1", "item2")
.count()
val recs = grouped
.groupBy("item1")
Run Code Online (Sandbox Code Playgroud)
创建expanded并且grouped很好,简而言之,expanded是两个ID在同一原始集中的所有可能的两个ID的列表.grouped过滤掉与自身匹配的ID,然后将所有唯一ID组合在一起并为每个ID生成计数.架构和数据样本grouped是:
root
|-- item1: long (nullable = true)
|-- item2: long …Run Code Online (Sandbox Code Playgroud) 如何在任务或作业完成后立即在控制台(Spark Shell或Spark提交作业)上收集这些指标.
我们使用Spark将数据从Mysql加载到Cassandra并且它非常庞大(例如:~200 GB和600M行).当任务完成后,我们想验证火花过程究竟完成了多少行?我们可以从Spark UI获取数字,但是如何从spark shell或spark-submit作业中检索该数字("Output Records Written").
示例命令从Mysql加载到Cassandra.
val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()
pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))
Run Code Online (Sandbox Code Playgroud)
我想在上面的任务中检索所有Spark UI指标,主要是输出大小和记录写入.
请帮忙.
谢谢你的时间!
apache-spark codahale-metrics apache-spark-sql spark-cassandra-connector spark-dataframe
我创建了一些数据帧数据样本df与
rdd = df.limit(10000).rdd
Run Code Online (Sandbox Code Playgroud)
这个操作需要相当长的时间(实际上为什么?在10000行之后它不能短路?),所以我假设我现在有了一个新的RDD.
但是,当我现在工作时rdd,每次访问它时都会有不同的行.好像它重新重新采样一样.缓存RDD有点帮助,但肯定不是保存?
它背后的原因是什么?
更新:这是Spark 1.5.2的再现
from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
print(rdd1.map(lambda row:row.i).reduce(add))
Run Code Online (Sandbox Code Playgroud)
输出是
499500
19955500
49651500
Run Code Online (Sandbox Code Playgroud)
我很惊讶.rdd没有修复数据.
编辑:为了表明它比重新执行问题更棘手,这里是一个单一的操作,在Spark 2.0.0.2.5.0上产生不正确的结果
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join
Run Code Online (Sandbox Code Playgroud)
基本上,每当您使用limit结果时可能会出错.我并不是指"只是众多样本中的一个",而是非常不正确(因为在这种情况下结果应始终为12345).
val tvalues: Array[Double] = Array(1.866393526974307, 2.864048126935307, 4.032486069215076, 7.876169953355888, 4.875333799256043, 14.316322626848278)
val pvalues: Array[Double] = Array(0.064020056478447, 0.004808399479386827, 8.914865448939047E-5, 7.489564524121306E-13, 2.8363794106756046E-6, 0.0)
Run Code Online (Sandbox Code Playgroud)
我有两个如上所述的数组,我需要从这个数组构建一个DataFrame,如下所示,
Tvalues Pvalues
1.866393526974307 0.064020056478447
2.864048126935307 0.004808399479386827
...... .....
Run Code Online (Sandbox Code Playgroud)
截至目前我StringBuilder在Scala 尝试.没有按预期进行.请帮帮我.
使用SparkSession代码Spark.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
val conf = SparkSession.builder
.master("local")
.appName("testing")
.enableHiveSupport() // <- enable Hive support.
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
代码pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cms.spark</groupId>
<artifactId>cms-spark</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cms-spark</name>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.jsoup</groupId>
<artifactId>jsoup</artifactId>
<version>1.8.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.5.3</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this …Run Code Online (Sandbox Code Playgroud) 我有一个以这种方式构建的CSV文件:
Header
Blank Row
"Col1","Col2"
"1,200","1,456"
"2,000","3,450"
Run Code Online (Sandbox Code Playgroud)
我在阅读此文件时遇到两个问题.
这是我尝试过的:
df = sc.textFile("myFile.csv")\
.map(lambda line: line.split(","))\ #Split By comma
.filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows
Run Code Online (Sandbox Code Playgroud)
但是,这不起作用,因为值中的逗号被读作分隔符而len(line)返回4而不是2.
我尝试了另一种方法:
data = sc.textFile("myFile.csv")
headers = data.take(2) #First two rows to be skipped
Run Code Online (Sandbox Code Playgroud)
我的想法是使用过滤器而不是读取标题.但是,当我尝试打印标题时,我得到了编码值.
[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00]
Run Code Online (Sandbox Code Playgroud)
读取CSV文件并跳过前两行的正确方法是什么?
我知道有一千个问题涉及如何通过盐键等来最好地划分你的DataFrames或者RDDs,但我认为这种情况不同,足以保证自己的问题.
我正在PySpark中构建协同过滤推荐引擎,这意味着需要比较每个用户(行)的唯一项目评级.因此,对于一DataFrame维的M (rows) x N (columns),这意味着该数据集变得M x (K choose 2)其中K << N为非空为用户(即,评估)元素的数量.
我的算法对于用户评估了大约一定数量项目的数据集非常有效.但是,对于一部分用户评估了大量项目(数量级比同一分区中的其他用户大)的情况,我的数据变得非常偏斜,最后几个分区开始花费大量时间.举一个简单的例子,考虑以下内容DataFrame:
cols = ['id', 'Toy Story', 'UP', 'Die Hard', 'MIB', 'The Shining']
ratings = [
(1, 4.5, 3.5, None, 1.0, None), # user 1
(2, 2.0, None, 5.0, 4.0, 3.0), # user 2
(3, 3.5, 5.0, 1.0, None, 1.0), # user 3
(4, None, None, 4.5, 3.5, 4.0), # user 4
(5, …Run Code Online (Sandbox Code Playgroud) 当我添加--conf spark.driver.maxResultSize=2050到我的spark-submit命令时,我收到以下错误.
17/12/27 18:33:19 ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /XXX.XX.XXX.XX:36245 is closed
17/12/27 18:33:19 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:726)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:755)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:755)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:755)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection from /XXX.XX.XXX.XX:36245 closed
at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
Run Code Online (Sandbox Code Playgroud)
添加此配置的原因是错误:
py4j.protocol.Py4JJavaError: An …Run Code Online (Sandbox Code Playgroud) 我试图找到一种可靠的方法来以编程方式计算Spark数据帧的大小(以字节为单位).
原因是我希望有一种方法来计算"最佳"分区数量("最佳"可能意味着不同的东西:它可能意味着 具有最佳分区大小,或者在写入Parquet时产生最佳文件大小表 - 但两者都可以假设为数据帧大小的某些线性函数.换句话说,我想调用coalesce(n)或repartition(n)在数据帧上,其中n不是固定数字,而是数据帧大小的函数.
关于SO的其他主题建议使用SizeEstimator.estimatefrom org.apache.spark.util来获取数据帧的字节大小,但我得到的结果是不一致的.
首先,我将数据帧保存到内存中:
df.cache().count
Run Code Online (Sandbox Code Playgroud)
Spark UI在"存储"选项卡中显示大小为4.8GB.然后,我运行以下命令来获取大小SizeEstimator:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
Run Code Online (Sandbox Code Playgroud)
这给出了115'715'808字节= ~116MB的结果.但是,应用于SizeEstimator不同的对象会导致非常不同的结果.例如,我尝试分别为数据帧中的每一行计算大小并将它们相加:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这导致12'084'698'256字节= ~12GB的大小.或者,我可以尝试应用于SizeEstimator每个分区:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
Run Code Online (Sandbox Code Playgroud)
这又导致10'792'965'376字节的不同大小=〜10.8GB.
我知道存在内存优化/内存开销,但在执行这些测试之后,我没有看到如何SizeEstimator使用它来获得足够好的数据帧大小估计(以及因此分区大小或结果Parquet文件大小).
SizeEstimator为了获得对数据帧大小或其分区的良好估计,应用适当的方法(如果有的话)是什么?如果没有,这里建议的方法是什么?
考虑以下DataFrame:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
可以使用以下代码创建:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以通过对每个元素应用函数而不使用?来直接修改ArrayType()列?"names"udf
例如,假设我想将该函数foo应用于"names"列。(我将使用其中的例子foo是str.upper只用于说明目的,但我的问题是关于可以应用到一个可迭代的元素任何有效的功能。)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
Run Code Online (Sandbox Code Playgroud)
TypeError:列不可迭代
我可以使用udf:
foo_udf = f.udf(lambda row: [foo(x) …Run Code Online (Sandbox Code Playgroud) spark-dataframe ×10
apache-spark ×8
pyspark ×5
pyspark-sql ×2
python ×2
scala ×2
arrays ×1
dataframe ×1
hadoop ×1
performance ×1