标签: pyspark

Spark RDD groupByKey + join 与 join 性能对比

我正在与其他用户共享的集群上使用 Spark。所以仅仅根据运行时间来判断我的哪一个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据,并使我的代码执行更长时间。

那么我可以在这里问两个问题吗:

  1. 我正在使用joinfunction 来 join 2RDDs并且我尝试groupByKey()在 using 之前使用join,如下所示:

    rdd1.groupByKey().join(rdd2)
    
    Run Code Online (Sandbox Code Playgroud)

    似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 让我的查询运行得更快。由于 Spark 使用惰性求值,我想知道groupByKeybefore是否join会让事情变得更快

  2. 我注意到Spark有一个SQL模块,到目前为止我真的没有时间尝试它,但是我可以问一下SQL模块和RDD SQL类似功能之间有什么区别吗?

apache-spark rdd apache-spark-sql pyspark

0
推荐指数
1
解决办法
4321
查看次数

Spark RDD 上的非持久化操作的成本有多大?

我想知道,rdd.unpersist()spark RDD 上的操作成本有多大?存储级别设置是否会影响此操作的性能?任何基准(结果/技术)都会非常有帮助。

apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1169
查看次数

将 Hive 行对象转换为整数 Spark

我正在尝试将配置单元列的输出转换为键值对。

sqlContext = HiveContext(sc)
id1 = sqlContext.sql("select instance_id from temp_table")
pairs1 = id1.map(lambda s: (int(s), 'Configuration'))
Run Code Online (Sandbox Code Playgroud)

我收到以下错误

TypeError: int() argument must be a string or a number, not 'Row'
Run Code Online (Sandbox Code Playgroud)

我不确定如何将 Hive Row 对象类型转换为整数,以便我可以对其应用映射函数

例如, id1 是一个数据框,当我对其应用collect() 时,它会返回

[Row(_c0=12616821)]
Run Code Online (Sandbox Code Playgroud)

我需要从行对象中提取值。请告诉我是否有与此问题相关的解决方案

python hive apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
2919
查看次数

将数据帧写入 HDFS 时出现 NumberFormatException 错误

我正在写信dataframeHDFS,使用以下代码

final_df.write.format("com.databricks.spark.csv").option("header", "true").save("path_to_hdfs")
Run Code Online (Sandbox Code Playgroud)

它给了我以下错误:

Caused by: java.lang.NumberFormatException: For input string: "124085346080"
Run Code Online (Sandbox Code Playgroud)

完整堆栈如下:

at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "124085346080"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:583)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:241)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:116)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:85)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:128)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:127)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) …
Run Code Online (Sandbox Code Playgroud)

hadoop scala hdfs apache-spark pyspark

0
推荐指数
1
解决办法
1640
查看次数

在pyspark中将列的内容拆分为行

我有一个数据框 df:

+------+----------+--------------------+
|SiteID| LastRecID|        Col_to_split|
+------+----------+--------------------+
|     2|1056962584|[214, 207, 206, 205]|
|     2|1056967423|          [213, 208]|
|     2|1056870114|     [213, 202, 199]|
|     2|1056876861|[203, 213, 212, 1...|
Run Code Online (Sandbox Code Playgroud)

我想将列分成这样的行:

+----------+-------------+-------------+
|     RecID|        index|        Value|
+----------+-------------+-------------+
|1056962584|            0|          214|
|1056962584|            1|          207|
|1056962584|            2|          206|
|1056962584|            3|          205|
|1056967423|            0|          213|
|1056967423|            1|          208|
|1056870114|            0|          213|
|1056870114|            1|          202|
|1056870114|            2|          199|
|1056876861|            0|          203|
|1056876861|            1|          213|
|1056876861|            2|          212|
|1056876861|            3|          1..|
|1056876861|       etc...|       etc...|
Run Code Online (Sandbox Code Playgroud)

值包含列表中的值。Index 包含列表中值的索引。

我怎样才能使用 PySpark 做到这一点?

pyspark

0
推荐指数
1
解决办法
1951
查看次数

尝试使用 to_csv 将镶木地板文件保存到 CSV 时出错

我正在尝试读取其中包含一些实验室数据的镶木地板文件,然后将其加载到临时表中,对该表执行查询,然后将结果保存到 CSV 文件中,并以列和逗号分隔。这是我的代码:

lines = sqlContext.read.parquet("hdfs:////data/lab_01/")
lines.registerTempTable("test_data")
resultsDF = sqlContext.sql("select * from results")

header = ["lab_key", "tray_id", "time", "gene_id", "source"]
pandas.resultsDF.to_csv("/data/results.csv", sep=",", columns = header)
Run Code Online (Sandbox Code Playgroud)

我得到的错误是这样的,它位于代码的最后一行:

AttributeError:模块“pandas”没有属性“resultsDF”

我正在寻找带有标题的 CSV 文件,如下所示:

lab_key  tray_id   time   gene_id  Source
10       26905972   1     8315     2        
30       26984972   1     8669     2        
30       26949059   1     1023     2        
30      26905972    1     1062     1    
Run Code Online (Sandbox Code Playgroud)

我的数据框 resultsDF 如下所示:

[Row(lab_key=1130, tray_id=26984905972, time=1, gene_id=833715, source=2),
 Row(lab_key=1130, tray_id=26984905972, time=1, gene_id=866950, source=2),
 Row(lab_key=1130, tray_id=26984905972, time=1, gene_id=1022843, source=2),
Run Code Online (Sandbox Code Playgroud)

python csv pandas pyspark

0
推荐指数
1
解决办法
3319
查看次数

将 Spark Rdd 列转换为 Pyspark 中的行

我有一个 Spark Rdd,其形式为 Row(id,Words),其中单词包含单词列表。我想将此列表转换为单列。输入

ID  Words
1   [w1,w2,w3]
2   [w3,w4]
Run Code Online (Sandbox Code Playgroud)

我想将其转换为输出格式

ID  Word
1   w1
1   w2
1   w3
2   w3
2   w4
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

0
推荐指数
1
解决办法
3280
查看次数

如何理解Spark中的reduceByKey?

我正在尝试学习 Spark,到目前为止一切进展顺利,除了我需要在值为列表的一对 RDD 上使用像reduceByKey或combineByKey这样的函数的问题。

我一直在尝试找到这些函数的详细文档,这些文档可以解释参数的实际含义,这样我就可以自己解决它,而无需去 Stack Overflow,但我就是找不到任何好的 Spark 文档。我读过《Learning Spark》的第3章和第4章,但说实话,对于最复杂的函数的解释非常糟糕。

我现在正在处理的问题如下:我有一对 RDD,其中键是字符串,值是两个元素的列表,这两个元素都是整数。像这样的东西:(国家,[小时,计数])。对于每个键,我希望仅保留计数最高的值,无论时间如何。一旦我有了上述格式的 RDD,我就会尝试通过调用 Spark 中的以下函数来查找最大值:

reduceByKey(lambda x, y: max(x[1], y[1]))
Run Code Online (Sandbox Code Playgroud)

但这会引发以下错误:

TypeError: 'int' object is not subscriptable
Run Code Online (Sandbox Code Playgroud)

这对我来说没有任何意义。我将参数 x 和 y 解释为两个键的值,例如 x=[13, 445] 和 y=[14, 109],但错误没有任何意义。我究竟做错了什么?

python documentation functional-programming apache-spark pyspark

0
推荐指数
1
解决办法
3366
查看次数

选择所有列并加入 pyspark 数据帧的更好方法

我有两个数据框pyspark。他们的架构如下

df1 
DataFrame[customer_id: int, email: string, city: string, state: string, postal_code: string, serial_number: string]

df2 
DataFrame[serial_number: string, model_name: string, mac_address: string]
Run Code Online (Sandbox Code Playgroud)

现在我想full outer join通过coalesce使用data frames.

我已经做了如下。我得到了预期的结果。

full_df = df1.join(df2, df1.serial_number == df2.serial_number, 'full_outer').select(df1.customer_id, df1.email, df1.city, df1.state, df1.postal_code,  f.coalesce(df1.serial_number, df2.serial_number).alias('serial_number'), df2.model_name, df2.mac_address)
Run Code Online (Sandbox Code Playgroud)

现在我想以不同的方式做上述事情。我不想在 join 语句中编写 select 附近的所有列名称,而是想做一些类似*data frame. 基本上我想要像下面这样的东西。

full_df = df1.join(df2, df1.serial_number == df2.serial_number, 'full_outer').select('df1.*', f.coalesce(df1.serial_number, df2.serial_number).alias('serial_number1'), df2.model_name, df2.mac_address).drop('serial_number')
Run Code Online (Sandbox Code Playgroud)

我得到了我想要的。有没有更好的方法来进行这种操作pyspark

编辑

/sf/ask/2529262571/?rq=1这与I am using a in …

apache-spark pyspark

0
推荐指数
1
解决办法
5950
查看次数

Pyspark:如何添加带有行号的列?

我有一个pyspark数据框。我想添加一个包含行号的列。

这就是我正在做的

stop_df = stop_df.withColumn("stop_id", monotonically_increasing_id())
Run Code Online (Sandbox Code Playgroud)

如果我检查 的最大值stop_id,我得到

stop_df.agg(max("stop_id")).show()
+--------------+
|  max(stop_id)|
+--------------+
|32478542692458|
+--------------+
Run Code Online (Sandbox Code Playgroud)

而行数是

stop_df.count()
Out[4]: 8134605
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

0
推荐指数
1
解决办法
1553
查看次数