小编Sai*_*Sai的帖子

pyspark spark-submit中的Java堆空间OutOfMemoryError?

我的数据集大小为10GB(例如Test.txt).

我写了我的pyspark脚本,如下所示(Test.py):

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName("FilterProduct").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
lines = spark.read.text("C:/Users/test/Desktop/Test.txt").rdd
lines.collect()
Run Code Online (Sandbox Code Playgroud)

然后我使用以下命令执行上面的脚本:

spark-submit Test.py --executor-memory  12G 
Run Code Online (Sandbox Code Playgroud)

然后我收到如下错误:

17/12/29 13:27:18 INFO FileScanRDD: Reading File path: file:///C:/Users/test/Desktop/Test.txt, range: 402653184-536870912, partition values: [empty row]
17/12/29 13:27:18 INFO CodeGenerator: Code generated in 22.743725 ms
17/12/29 13:27:44 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3230)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

10
推荐指数
1
解决办法
8063
查看次数

如何使用oozie检查文件是否存在于HDFS位置?

如何使用 Oozie 检查 HDFS 位置中的文件是否存在?

在我的 HDFS 位置,我test_08_01_2016.csv每天晚上 11 点都会收到这样的文件。

我想在晚上 11.15 之后检查此文件是否存在。我可以使用 Oozie 协调器作业来安排批处理。

但是如何验证文件是否存在于 HDFS 中?

hadoop oozie oozie-coordinator hadoop2 cloudera-cdh

6
推荐指数
1
解决办法
4832
查看次数

如何在不使用Pyspark中的collect()方法的情况下将pyspark.rdd.PipelinedRDD转换为数据框?

我有pyspark.rdd.PipelinedRDD (Rdd1)。当我在做的时候Rdd1.collect(),它给出如下结果。

 [(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
 (1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
 (2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
 (3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
Run Code Online (Sandbox Code Playgroud)

现在我想不使用collect()方法将pyspark.rdd.PipelinedRDD转换为数据框

我的最终数据框架应如下所示。df.show()应如下所示:

+----------+-------+-------------------+
|CId       |IID    |Score              |
+----------+-------+-------------------+
|10        |4      |2.9996439803387602 |
|10        |5      |1.6767412921625855 |
|10        |3      |3.616726727464709  |
|1         |4      |-1.5271512313750577|
|1         |5      |1.9665475696370045 |
|1         |3      |2.016527311459324  |
|2         |4      |4.033642544526678  |
|2         |5      |3.1517805604906313 |
|2         |3      |6.230272144805092  |
|3         |4      |2.9757316477407443 | …
Run Code Online (Sandbox Code Playgroud)

python-3.x apache-spark apache-spark-sql pyspark spark-dataframe

5
推荐指数
1
解决办法
1万
查看次数

如何在 Spark Scala 中将 org.apache.spark.sql.ColumnName 转换为字符串、十进制类型?

我有一个像下面这样的 JSON

{"name":"method1","parameter1":"P1name","parameter2": 1.0}
Run Code Online (Sandbox Code Playgroud)

我正在加载我的 JSON 文件

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt") 
scala> df.show()
Run Code Online (Sandbox Code Playgroud)
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
Run Code Online (Sandbox Code Playgroud)

我有一个像下面这样的功能:

def method1(P1:String, P2:Double)={
     |  print(P1)
         print(P2)
     | }
Run Code Online (Sandbox Code Playgroud)

在执行下面的代码后,我根据列名调用我的方法1,它应该执行方法1。

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
Run Code Online (Sandbox Code Playgroud)

但我收到波纹管错误。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt") 
scala> df.show()
Run Code Online (Sandbox Code Playgroud)

请让我知道如何将 org.apache.spark.sql.ColumnName 数据类型转换为 String

scala user-defined-functions apache-spark apache-spark-sql

4
推荐指数
1
解决办法
9545
查看次数

错误执行器:阶段 6.0 Spark scala 中的任务 0.0 出现异常?

我有一个如下所示的 json 文件。

{"name":"method2","name1":"test","parameter1":"C:/Users/test/Desktop/Online.csv","parameter2": 1.0}
Run Code Online (Sandbox Code Playgroud)

我正在加载我的 json 文件。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/data.json")
val df1=df.select($"name",$"parameter1",$"parameter2").toDF()
df1.show()
Run Code Online (Sandbox Code Playgroud)

我有 3 个如下功能:

def method1(P1:String, P2:Double) {
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").dropDuplicates("CID", "Sc").withColumn("Rat", lit(P2))
val outPutPath="C:/Users/test/Desktop/output"
rs.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def method2(P1:String, P2:Double){
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").withColumn("r", lit(P2))
val rs1= rs.filter($"CID" =!= "").groupBy("CID","Sc").agg(sum(rs("r")).alias("R"))
val outPutPath="C:/Users/test/Desktop/output"
rs1.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def methodn(P1:String, P2:Double) {
println("method 2 printhing")
println(P2)
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用下面的代码调用上面的函数

df1.map( row => (row.getString(0), …
Run Code Online (Sandbox Code Playgroud)

scala user-defined-functions apache-spark apache-spark-sql

4
推荐指数
1
解决办法
2万
查看次数

映射和删除Spark Scala中的重复项?

我有一个数据集test.txt。它包含如下数据

1::1::3
1::1::2
1::2::2
2::1::5
2::1::4
2::2::2
3::1::1
3::2::2
Run Code Online (Sandbox Code Playgroud)

我已经使用以下代码创建了数据框。

case class Rating(userId: Int, movieId: Int, rating: Float)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 3)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
}

val ratings = spark.read.textFile("C:/Users/cravi/Desktop/test.txt").map(parseRating).toDF()
Run Code Online (Sandbox Code Playgroud)

但是当我尝试打印输出时,我的输出低于输出

[1,1,3.0]
[1,1,2.0]
[1,2,2.0]
[2,1,2.0]
[2,1,4.0]
[2,2,2.0]
[3,1,1.0]
[3,2,2.0]
Run Code Online (Sandbox Code Playgroud)

但是我想打印输出如下,即删除重复的组合,而不是field(2) value 1.0

[1,1,1.0]
[1,2,1.0]
[2,1,1.0]
[2,2,1.0]
[3,1,1.0]
[3,2,1.0]
Run Code Online (Sandbox Code Playgroud)

scala scala-collections apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3841
查看次数

how to get first value and last value from dataframe column in pyspark?

I Have Dataframe,I want get first value and last value from DataFrame column.

+----+-----+--------------------+
|test|count|             support|
+----+-----+--------------------+
|   A|    5| 0.23809523809523808|
|   B|    5| 0.23809523809523808|
|   C|    4| 0.19047619047619047|
|   G|    2| 0.09523809523809523|
|   K|    2| 0.09523809523809523|
|   D|    1|0.047619047619047616|
+----+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)

expecting output is from support column first,last value i.e x=[0.23809523809523808,0.047619047619047616.]

apache-spark apache-spark-sql pyspark pyspark-sql

3
推荐指数
1
解决办法
1万
查看次数

在scala中的现有文件中添加一行新文本

我是Scala的新手.我想在现有文件中添加一行新文本.

我已经尝试了下面的代码,但它覆盖了现有的文本:

println("plese enter the text")
val text = Console.readLine()
val write = new PrintWriter(new File("Test.txt"))
write.write(text) 
write.close()
Run Code Online (Sandbox Code Playgroud)

请帮我解决一下这个.

scala

0
推荐指数
1
解决办法
5136
查看次数