我正在阅读CSV作为Spark DataFrame并在其上执行机器学习操作.我一直在获取Python序列化EOFError - 任何想法为什么?我认为这可能是一个内存问题 - 即文件超出可用RAM - 但是大幅减小DataFrame的大小并没有阻止EOF错误.
玩具代码和错误如下.
#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('myfile.csv')
#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)
#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()
Run Code Online (Sandbox Code Playgroud)
spark-submit
在单个节点上运行上述代码会重复抛出以下错误,即使在拟合模型之前减小了DataFrame的大小(例如tinydf = df.sample(False, 0.00001)
:
Traceback (most recent call last):
File …
Run Code Online (Sandbox Code Playgroud) 这是与Spark 1.6的CDH .
我正在尝试将此假设CSV导入到Apache SparkFrame的apache中:
$ hadoop fs -cat test.csv
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a
Run Code Online (Sandbox Code Playgroud)
我使用databricks-csv jar.
val textData = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "false")
.option("delimiter", ",")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss")
.option("inferSchema", "true")
.option("nullValue", "null")
.load("test.csv")
Run Code Online (Sandbox Code Playgroud)
我使用inferSchema为生成的DataFrame制作模式.printSchema()函数为上面的代码提供了以下输出:
scala> textData.printSchema()
root
|-- C0: string (nullable = true)
|-- C1: string (nullable = true)
|-- C2: string (nullable = true)
|-- C3: string (nullable = true)
|-- C4: string (nullable = true)
|-- C5: timestamp (nullable = true)
|-- C6: string (nullable = …
Run Code Online (Sandbox Code Playgroud) 我使用Spark 1.6.1.
我们正在尝试使用HiveContext和DataFrameWriter将ORC文件写入HDFS.虽然我们可以使用
df.write().orc(<path>)
Run Code Online (Sandbox Code Playgroud)
我们宁愿做类似的事情
df.write().options(Map("format" -> "orc", "path" -> "/some_path")
Run Code Online (Sandbox Code Playgroud)
这样我们就可以根据使用此帮助程序库的应用程序灵活地更改格式或根路径.我们在哪里可以找到可以传递到DataFrameWriter的选项的引用?我在这里的文档中找不到任何内容
在Spark 1.6.0/Scala中,是否有机会获得collect_list("colC")
或collect_set("colC").over(Window.partitionBy("colA").orderBy("colB")
?
我正在使用Spark 1.6.1:
目前我正在使用CrossValidator来训练我的ML管道,其中包含各种参数.在训练过程之后,我可以使用CrossValidatorModel的bestModel属性来获取在交叉验证期间表现最佳的模型.是否会自动丢弃交叉验证的其他模型,还是可以选择性能比bestModel差的模型?
我问,因为我使用F1分数指标进行交叉验证,但我也对所有模型的weighedRecall感兴趣,而不仅仅是在交叉验证期间表现最佳的模型.
val folds = 6
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new MulticlassClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(folds)
val avgF1Scores = cvModel.avgMetrics
val predictedDf = cvModel.bestModel.transform(testDf)
// Here I would like to predict as well with the other models of the cross validation
Run Code Online (Sandbox Code Playgroud) cross-validation apache-spark apache-spark-mllib apache-spark-1.6
我使用spark 1.6.1。
我的spark应用程序读取s3中存储的10000个以上镶木地板文件。
val df = sqlContext.read.option("mergeSchema", "true").parquet(myPaths: _*)
Run Code Online (Sandbox Code Playgroud)
myPaths
是Array[String]
包含10000个实木复合地板文件的路径的。每条路径都是这样s3n://bucketname/blahblah.parquet
Spark警告消息如下。
WARN TaskSetManager:阶段4包含一个非常大的任务(108KB)。建议的最大任务大小为100KB。
无论如何,Spark设法运行并完成了这项工作,但我想这可能会减慢火花处理工作的速度。
有人对此问题有很好的建议吗?
我已经按照此处提到的方式提交了我的 Spark 作业,bin/spark-submit --class DataSet BasicSparkJob-assembly-1.0.jar
但没有提及--master
参数或spark.master
参数。而不是将该作业提交到我的 3 节点 Spark 集群。但我想知道它在哪里提交了作业,因为它没有显示任何信息Running Applications
我正在使用 Amazon Elastic Map Reduce 4.7.1、Hadoop 2.7.2、Hive 1.0.0 和 Spark 1.6.1。
用例:我有一个用于处理数据的 Spark 集群。该数据作为 Parquet 文件存储在 S3 中。我希望工具能够使用在 Hive Metastore 中注册的名称来查询数据(例如,查找foo
表而不是parquet.`s3://bucket/key/prefix/foo/parquet`
做事的风格)。我还希望这些数据在 Hive Metastore(一个单独的 RDS 实例)的生命周期内持续存在,即使我拆除 EMR 集群并启动一个连接到同一个 Metastore 的新集群也是如此。
问题:如果我这样做sqlContext.saveAsTable("foo")
,默认情况下,会在 Hive Metastore 中创建一个托管表(请参阅https://spark.apache.org/docs/latest/sql-programming-guide.html)。这些托管表将数据从 S3 复制到 EMR 集群上的 HDFS,这意味着在拆除 EMR 集群后元数据将无用。
我有一个Spark Streaming作业与其他作业(Spark核心作业)一起在我们的集群上运行。我想对包括Spark Streaming在内的这些作业使用动态资源分配。根据下面的JIRA问题,不支持动态分配Spark Streaming(在1.6.1版本中)。但已在2.0.0中修复
根据本期的PDF,它说应该有一个名为“ spark.streaming.dynamicAllocation.enabled=true
但是我在文档中没有看到此配置” 的配置字段
。
任何人都可以确认,
spark.streaming.dynamicAllocation.enabled=true
或spark.dynamicAllocation.enabled=true
)dynamic-allocation apache-spark spark-streaming apache-spark-1.6 apache-spark-2.0
我正在工作spark v1.6.我有以下两个数据帧,我想在我的左外连接结果集中将null转换为0.有什么建议?
val x:Array [Int] = Array(1,2,3)val df_sample_x = sc.parallelize(x).toDF("x")
val y:Array [Int] = Array(3,4,5)val df_sample_y = sc.parallelize(y).toDF("y")
val df_sample_join = df_sample_x.join(df_sample_y,df_sample_x("x")=== df_sample_y("y"),"left_outer")
scala> df_sample_join.show
1 | 空值
2 | 空值
3 | 3
scala> df_sample_join.show
1 | 0
2 | 0
3 | 3