我的数据集大小为10GB(例如Test.txt).
我写了我的pyspark脚本,如下所示(Test.py):
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName("FilterProduct").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
lines = spark.read.text("C:/Users/test/Desktop/Test.txt").rdd
lines.collect()
Run Code Online (Sandbox Code Playgroud)
然后我使用以下命令执行上面的脚本:
spark-submit Test.py --executor-memory 12G
Run Code Online (Sandbox Code Playgroud)
然后我收到如下错误:
17/12/29 13:27:18 INFO FileScanRDD: Reading File path: file:///C:/Users/test/Desktop/Test.txt, range: 402653184-536870912, partition values: [empty row]
17/12/29 13:27:18 INFO CodeGenerator: Code generated in 22.743725 ms
17/12/29 13:27:44 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3230)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at …Run Code Online (Sandbox Code Playgroud) 如何使用 Oozie 检查 HDFS 位置中的文件是否存在?
在我的 HDFS 位置,我test_08_01_2016.csv每天晚上 11 点都会收到这样的文件。
我想在晚上 11.15 之后检查此文件是否存在。我可以使用 Oozie 协调器作业来安排批处理。
但是如何验证文件是否存在于 HDFS 中?
我有pyspark.rdd.PipelinedRDD (Rdd1)。当我在做的时候Rdd1.collect(),它给出如下结果。
[(10, {3: 3.616726727464709, 4: 2.9996439803387602, 5: 1.6767412921625855}),
(1, {3: 2.016527311459324, 4: -1.5271512313750577, 5: 1.9665475696370045}),
(2, {3: 6.230272144805092, 4: 4.033642544526678, 5: 3.1517805604906313}),
(3, {3: -0.3924680103722977, 4: 2.9757316477407443, 5: -1.5689126834176417})]
Run Code Online (Sandbox Code Playgroud)
现在我想不使用collect()方法将pyspark.rdd.PipelinedRDD转换为数据框
我的最终数据框架应如下所示。df.show()应如下所示:
+----------+-------+-------------------+
|CId |IID |Score |
+----------+-------+-------------------+
|10 |4 |2.9996439803387602 |
|10 |5 |1.6767412921625855 |
|10 |3 |3.616726727464709 |
|1 |4 |-1.5271512313750577|
|1 |5 |1.9665475696370045 |
|1 |3 |2.016527311459324 |
|2 |4 |4.033642544526678 |
|2 |5 |3.1517805604906313 |
|2 |3 |6.230272144805092 |
|3 |4 |2.9757316477407443 | …Run Code Online (Sandbox Code Playgroud) python-3.x apache-spark apache-spark-sql pyspark spark-dataframe
我有一个像下面这样的 JSON
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
Run Code Online (Sandbox Code Playgroud)
我正在加载我的 JSON 文件
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt")
scala> df.show()
Run Code Online (Sandbox Code Playgroud)
{"name":"method1","parameter1":"P1name","parameter2": 1.0}
Run Code Online (Sandbox Code Playgroud)
我有一个像下面这样的功能:
def method1(P1:String, P2:Double)={
| print(P1)
print(P2)
| }
Run Code Online (Sandbox Code Playgroud)
在执行下面的代码后,我根据列名调用我的方法1,它应该执行方法1。
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
df.withColumn("methodCalling", when($"name" === "method1", method1($"parameter1",$"parameter2")).otherwise(when($"name" === "method2", method2($"parameter1",$"parameter2")))).show(false)
Run Code Online (Sandbox Code Playgroud)
但我收到波纹管错误。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/te.txt")
scala> df.show()
Run Code Online (Sandbox Code Playgroud)
请让我知道如何将 org.apache.spark.sql.ColumnName 数据类型转换为 String
我有一个如下所示的 json 文件。
{"name":"method2","name1":"test","parameter1":"C:/Users/test/Desktop/Online.csv","parameter2": 1.0}
Run Code Online (Sandbox Code Playgroud)
我正在加载我的 json 文件。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("C:/Users/test/Desktop/data.json")
val df1=df.select($"name",$"parameter1",$"parameter2").toDF()
df1.show()
Run Code Online (Sandbox Code Playgroud)
我有 3 个如下功能:
def method1(P1:String, P2:Double) {
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").dropDuplicates("CID", "Sc").withColumn("Rat", lit(P2))
val outPutPath="C:/Users/test/Desktop/output"
rs.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def method2(P1:String, P2:Double){
val data = spark.read.option("header", true).csv(P1).toDF()
val rs= data.select("CID", "Sc").withColumn("r", lit(P2))
val rs1= rs.filter($"CID" =!= "").groupBy("CID","Sc").agg(sum(rs("r")).alias("R"))
val outPutPath="C:/Users/test/Desktop/output"
rs1.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(outPutPath)
}
def methodn(P1:String, P2:Double) {
println("method 2 printhing")
println(P2)
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用下面的代码调用上面的函数
df1.map( row => (row.getString(0), …Run Code Online (Sandbox Code Playgroud) 我有一个数据集test.txt。它包含如下数据
1::1::3
1::1::2
1::2::2
2::1::5
2::1::4
2::2::2
3::1::1
3::2::2
Run Code Online (Sandbox Code Playgroud)
我已经使用以下代码创建了数据框。
case class Rating(userId: Int, movieId: Int, rating: Float)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 3)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
}
val ratings = spark.read.textFile("C:/Users/cravi/Desktop/test.txt").map(parseRating).toDF()
Run Code Online (Sandbox Code Playgroud)
但是当我尝试打印输出时,我的输出低于输出
[1,1,3.0]
[1,1,2.0]
[1,2,2.0]
[2,1,2.0]
[2,1,4.0]
[2,2,2.0]
[3,1,1.0]
[3,2,2.0]
Run Code Online (Sandbox Code Playgroud)
但是我想打印输出如下,即删除重复的组合,而不是field(2) value 1.0。
[1,1,1.0]
[1,2,1.0]
[2,1,1.0]
[2,2,1.0]
[3,1,1.0]
[3,2,1.0]
Run Code Online (Sandbox Code Playgroud) I Have Dataframe,I want get first value and last value from DataFrame column.
+----+-----+--------------------+
|test|count| support|
+----+-----+--------------------+
| A| 5| 0.23809523809523808|
| B| 5| 0.23809523809523808|
| C| 4| 0.19047619047619047|
| G| 2| 0.09523809523809523|
| K| 2| 0.09523809523809523|
| D| 1|0.047619047619047616|
+----+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)
expecting output is from support column first,last value i.e x=[0.23809523809523808,0.047619047619047616.]
我是Scala的新手.我想在现有文件中添加一行新文本.
我已经尝试了下面的代码,但它覆盖了现有的文本:
println("plese enter the text")
val text = Console.readLine()
val write = new PrintWriter(new File("Test.txt"))
write.write(text)
write.close()
Run Code Online (Sandbox Code Playgroud)
请帮我解决一下这个.
apache-spark ×6
scala ×4
pyspark ×3
cloudera-cdh ×1
hadoop ×1
hadoop2 ×1
oozie ×1
pyspark-sql ×1
python-3.x ×1