我想从一个简单的CSV文件创建一个Spark数据集.以下是CSV文件的内容:
name,state,number_of_people,coolness_index
trenton,nj,"10","4.5"
bedford,ny,"20","3.3"
patterson,nj,"30","2.2"
camden,nj,"40","8.8"
Run Code Online (Sandbox Code Playgroud)
以下是制作数据集的代码:
var location = "s3a://path_to_csv"
case class City(name: String, state: String, number_of_people: Long)
val cities = spark.read
.option("header", "true")
.option("charset", "UTF8")
.option("delimiter",",")
.csv(location)
.as[City]
Run Code Online (Sandbox Code Playgroud)
以下是错误消息:"无法number_of_people向字符串转换为bigint,因为它可能会截断"
Databricks讨论了如何在此博客文章中创建数据集和此特定错误消息.
编码器急切地检查您的数据是否与预期的架构匹配,在您尝试错误处理TB数据之前提供有用的错误消息.例如,如果我们尝试使用太小的数据类型,那么转换为对象将导致截断(即numStudents大于一个字节,其最大值为255),Analyzer将发出AnalysisException.
我正在使用该Long类型,所以我没想到会看到此错误消息.
在spark Dataset.filter中获取此null错误
输入CSV:
name,age,stat
abc,22,m
xyz,,s
Run Code Online (Sandbox Code Playgroud)
工作代码:
case class Person(name: String, age: Long, stat: String)
val peopleDS = spark.read.option("inferSchema","true")
.option("header", "true").option("delimiter", ",")
.csv("./people.csv").as[Person]
peopleDS.show()
peopleDS.createOrReplaceTempView("people")
spark.sql("select * from people where age > 30").show()
Run Code Online (Sandbox Code Playgroud)
失败的代码(添加以下行返回错误):
val filteredDS = peopleDS.filter(_.age > 30)
filteredDS.show()
Run Code Online (Sandbox Code Playgroud)
返回null错误
java.lang.RuntimeException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "age")
- root class: "com.gcp.model.Person"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] …Run Code Online (Sandbox Code Playgroud) 目前Spark有两个Row实现:
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
Run Code Online (Sandbox Code Playgroud)
什么需要两个?它们是代表相同的编码实体,但一个在内部使用(内部API),另一个与外部API一起使用?
我有以下案例类:
case class User(userId: String)
Run Code Online (Sandbox Code Playgroud)
以及以下架构:
+--------------------+------------------+
| col_name| data_type|
+--------------------+------------------+
| user_id| string|
+--------------------+------------------+
Run Code Online (Sandbox Code Playgroud)
当我尝试将 a 转换DataFrame为 typed Dataset[User]with 时spark.read.table("MyTable").as[User],出现字段名称不匹配的错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException:
cannot resolve ''`user_id`' given input columns: [userId];;
Run Code Online (Sandbox Code Playgroud)
有没有什么简单的方法可以在不破坏 scala 习语和命名我的字段的情况下解决这个问题user_id?当然,我的真实表有更多的字段,而且我有更多的案例类/表,所以Encoder为每个案例类手动定义一个是不可行的(而且我不太了解宏,所以这是毫无疑问;尽管如果存在的话,我很乐意使用它!)。
我觉得我错过了一个非常明显的“将snake_case 转换为camelCase=true”选项,因为我使用过的几乎所有ORM 中都存在这个选项。
我一直认为数据集/数据帧API是相同的......唯一的区别是数据集API将为您提供编译时安全性.对 ?
那么......我的案子非常简单:
case class Player (playerID: String, birthYear: Int)
val playersDs: Dataset[Player] = session.read
.option("header", "true")
.option("delimiter", ",")
.option("inferSchema", "true")
.csv(PeopleCsv)
.as[Player]
// Let's try to find players born in 1999.
// This will work, you have compile time safety... but it will not use predicate pushdown!!!
playersDs.filter(_.birthYear == 1999).explain()
// This will work as expected and use predicate pushdown!!!
// But you can't have compile time safety with this :(
playersDs.filter('birthYear === 1999).explain()
Run Code Online (Sandbox Code Playgroud)
从第一个示例解释将显示它不执行谓词下推(注意空PushedFilters):
== Physical Plan == …Run Code Online (Sandbox Code Playgroud) dataframe apache-spark apache-spark-sql apache-spark-dataset
我有格式化为以下示例的大数据记录:
// +---+------+------+
// |cid|itemId|bought|
// +---+------+------+
// |abc| 123| true|
// |abc| 345| true|
// |abc| 567| true|
// |def| 123| true|
// |def| 345| true|
// |def| 567| true|
// |def| 789| false|
// +---+------+------+
Run Code Online (Sandbox Code Playgroud)
cid并且itemId是字符串。
有965,964,223条记录。
我正在尝试cid使用StringIndexer以下方法将其转换为整数:
dataset.repartition(50)
val cidIndexer = new StringIndexer().setInputCol("cid").setOutputCol("cidIndex")
val cidIndexedMatrix = cidIndexer.fit(dataset).transform(dataset)
Run Code Online (Sandbox Code Playgroud)
但是这些代码行非常慢(大约需要30分钟)。问题在于它是如此之大,以至于我之后再也无能为力了。
我正在使用具有2个节点(61 GB内存)的R4 2XLarge集群的Amazon EMR集群。
我可以进一步改善性能吗?任何帮助都感激不尽。
我们如何在 Spark 中并行化循环,以便处理不是顺序的而是并行的。举个例子 - 我的 csv 文件(称为“bill_item.csv”)中包含以下数据,该文件包含以下数据:
|-----------+------------|
| bill_id | item_id |
|-----------+------------|
| ABC | 1 |
| ABC | 2 |
| DEF | 1 |
| DEF | 2 |
| DEF | 3 |
| GHI | 1 |
|-----------+------------|
Run Code Online (Sandbox Code Playgroud)
我必须得到如下输出:
|-----------+-----------+--------------|
| item_1 | item_2 | Num_of_bills |
|-----------+-----------+--------------|
| 1 | 2 | 2 |
| 2 | 3 | 1 |
| 1 | 3 | 1 |
|-----------+-----------+--------------|
Run Code Online (Sandbox Code Playgroud)
我们看到项目 1 和 …
python bigdata apache-spark apache-spark-dataset apache-spark-2.0
在执行某些函数后,为什么nullable = true?df中仍然没有纳米值.
val myDf = Seq((2,"A"),(2,"B"),(1,"C"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Int"))
myDf.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2").show
Run Code Online (Sandbox Code Playgroud)
当nullable = true被调用时,nullable对于两列都是false.
val foo: (Int => String) = (t: Int) => {
fooMap.get(t) match {
case Some(tt) => tt
case None => "notFound"
}
}
val fooMap = Map(
1 -> "small",
2 -> "big"
)
val fooUDF = udf(foo)
myDf
.withColumn("foo", fooUDF(col("foo")))
.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2")
.select("foo", "foo_2")
.printSchema
Run Code Online (Sandbox Code Playgroud)
但是现在,对于至少一个之前为假的列,可以为空.怎么解释这个?
我试图根据制造商列内容将数据集拆分为不同的数据集.它非常慢
请建议一种改进代码的方法,以便它可以更快地执行并减少Java代码的使用.
List<Row> lsts= countsByAge.collectAsList();
for(Row lst:lsts){
String man=lst.toString();
man = man.replaceAll("[\\p{Ps}\\p{Pe}]", "");
Dataset<Row> DF = src.filter("Manufacturer='"+man+"'");
DF.show();
}
Run Code Online (Sandbox Code Playgroud)
代码,输入和输出数据集如下所示.
package org.sparkexample;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RelationalGroupedDataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
public class GroupBy {
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "C:\\winutils");
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("split datasets").getOrCreate();
sc.setLogLevel("ERROR");
Dataset<Row> src= sqlContext.read()
.format("com.databricks.spark.csv")
.option("header", …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
我一直在寻找任何链接、文档或文章来帮助我了解我们什么时候应该选择数据集而不是数据框,反之亦然?
我在互联网上找到的都是标题,when to use a Dataset但是当打开时,它们只是指定了 Dataframe 和 Dataset 之间的差异。有很多链接只是列出了场景名称的差异。
stackoverflow 上只有一个问题具有正确的标题,但即使在该答案中,databricks 文档链接也不起作用。
我正在寻找一些信息,可以帮助我从根本上理解我们何时选择数据集,或者在什么情况下数据集优于数据帧,反之亦然。如果没有答案,即使是可以帮助我理解的链接或文档也是值得赞赏的。