在执行某些函数后,为什么nullable = true?df中仍然没有纳米值.
val myDf = Seq((2,"A"),(2,"B"),(1,"C"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Int"))
myDf.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2").show
Run Code Online (Sandbox Code Playgroud)
当nullable = true被调用时,nullable对于两列都是false.
val foo: (Int => String) = (t: Int) => {
fooMap.get(t) match {
case Some(tt) => tt
case None => "notFound"
}
}
val fooMap = Map(
1 -> "small",
2 -> "big"
)
val fooUDF = udf(foo)
myDf
.withColumn("foo", fooUDF(col("foo")))
.withColumn("foo_2", when($"foo" === 2 , 1).otherwise(0)).select("foo", "foo_2")
.select("foo", "foo_2")
.printSchema
Run Code Online (Sandbox Code Playgroud)
但是现在,对于至少一个之前为假的列,可以为空.怎么解释这个?
Spark 2.1和Scala 2.11在这里.我有一个大的Map[String,Date],它有10K键/值对.我还有10K JSON文件存在于Spark可访问的文件系统中:
mnt/
some/
path/
data00001.json
data00002.json
data00003.json
...
data10000.json
Run Code Online (Sandbox Code Playgroud)
映射中的每个KV对对应于其各自的JSON文件(因此第一个映射KV对对应于data00001.json,等等)
我想将所有这些JSON文件读入1个大型Spark中Dataset,当我在它时,向该数据集添加两个新列(JSON文件中不存在).每个映射键都是第一个新列的值,每个键的值将是第二个新列的值:
val objectSummaries = getScalaList()
val dataFiles = objectSummaries.filter { _.getKey.endsWith("data.json") }
val dataDirectories = dataFiles.map(dataFile => {
val keyComponents = dataFile.getKey.split("/")
val parent = if (keyComponents.length > 1) keyComponents(keyComponents.length - 2) else "/"
(parent, dataFile.getLastModified)
})
// TODO: How to take each KV pair from dataDirectories above and store them as the values for the
// two new columns?
val …Run Code Online (Sandbox Code Playgroud) json scala apache-spark apache-spark-sql apache-spark-dataset
我一直在寻找任何链接、文档或文章来帮助我了解我们什么时候应该选择数据集而不是数据框,反之亦然?
我在互联网上找到的都是标题,when to use a Dataset但是当打开时,它们只是指定了 Dataframe 和 Dataset 之间的差异。有很多链接只是列出了场景名称的差异。
stackoverflow 上只有一个问题具有正确的标题,但即使在该答案中,databricks 文档链接也不起作用。
我正在寻找一些信息,可以帮助我从根本上理解我们何时选择数据集,或者在什么情况下数据集优于数据帧,反之亦然。如果没有答案,即使是可以帮助我理解的链接或文档也是值得赞赏的。
Spark SQL DataFrame/Dataset执行引擎具有几个非常有效的时间和空间优化(例如InternalRow和表达式代码).根据许多文档,对于大多数分布式算法来说,它似乎比RDD更好.
但是,我做了一些源代码研究,但仍然不相信.我毫不怀疑InternalRow更紧凑,可以节省大量内存.但是执行算法可能不会更快地保存预定义表达式.也就是说,在源代码中表明 org.apache.spark.sql.catalyst.expressions.ScalaUDF,每个用户定义的函数都做3件事:
显然,这比直接在RDD上应用函数而不进行任何转换要慢.任何人都可以通过一些实例分析和代码分析来确认或否认我的推测吗?
非常感谢您的任何建议或见解.
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我想调整我的Java Spark应用程序(实际上使用RDD进行某些计算)来Datasets代替RDDs.我是数据集的新手,不知道如何将哪个事务映射到相应的数据集操作.
目前我将它们映射为:
JavaSparkContext.textFile(...) -> SQLContext.read().textFile(...)
JavaRDD.filter(Function) -> Dataset.filter(FilterFunction)
JavaRDD.map(Function) -> Dataset.map(MapFunction)
JavaRDD.mapToPair(PairFunction) -> Dataset.groupByKey(MapFunction) ???
JavaPairRDD.aggregateByKey(U, Function2, Function2) -> KeyValueGroupedDataset.???
Run Code Online (Sandbox Code Playgroud)
相应的问题是:
JavaRDD.mapToPair该Dataset.groupByKey方法?JavaPairRDD映射到KeyValueGroupedDataset?JavaPairRDD.aggregateByKey方法?但是,我想将以下RDD代码移植到数据集中:
JavaRDD<Article> goodRdd = ...
JavaPairRDD<String, Article> ArticlePairRdd = goodRdd.mapToPair(new PairFunction<Article, String, Article>() { // Build PairRDD<<Date|Store|Transaction><Article>>
public Tuple2<String, Article> call(Article article) throws Exception {
String key = article.getKeyDate() + "|" + article.getKeyStore() + "|" + article.getKeyTransaction() + "|" + article.getCounter(); …Run Code Online (Sandbox Code Playgroud) 新的火花世界和尝试用我在网上找到的scala编写的数据集示例
在通过SBT运行时,我继续收到以下错误
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class
知道我在俯瞰什么
也可以随意指出编写相同数据集示例的更好方法
谢谢
> sbt> runMain DatasetExample
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/10/25 01:06:39 INFO Remoting: Starting remoting
16/10/25 01:06:46 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.150.130:50555]
[error] (run-main-6) org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `DatasetExample$Student` without access to the scope that this class was defined in. Try moving this class out of its parent class.;
org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders
我一直在尝试不同的方法来过滤类型化的数据集.事实证明,性能可能完全不同.
数据集是基于具有33列和4226047行的1.6 GB数据行创建的.通过加载csv数据并映射到案例类来创建DataSet.
val df = spark.read.csv(csvFile).as[FireIncident]
Run Code Online (Sandbox Code Playgroud)
UnitId ='B02'上的过滤器应返回47980行.我测试了三种方法如下:1)使用类型列(本地主机上约500毫秒)
df.where($"UnitID" === "B02").count()
Run Code Online (Sandbox Code Playgroud)
2)使用临时表和SQL查询(〜与选项1相同)
df.createOrReplaceTempView("FireIncidentsSF")
spark.sql("SELECT * FROM FireIncidentsSF WHERE UnitID='B02'").count()
Run Code Online (Sandbox Code Playgroud)
3)使用强类型字段(14,987ms,即慢30倍)
df.filter(_.UnitID.orNull == "B02").count()
Run Code Online (Sandbox Code Playgroud)
我使用python API再次测试它,对于相同的数据集,时间为17,046 ms,与scala API选项3的性能相当.
df.filter(df['UnitID'] == 'B02').count()
Run Code Online (Sandbox Code Playgroud)
有人可以说明3)和python API的执行方式与前两个选项有何不同?
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我编写了使用SparkSQL访问Hive表的代码.这是代码:
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.master("local[*]")
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate();
Dataset<Row> df = spark.sql("select survey_response_value from health").toDF();
df.show();
Run Code Online (Sandbox Code Playgroud)
我想知道如何将完整输出转换为String或String数组?因为我正在尝试使用另一个模块,只有我可以传递String或String类型的Array值.
我已经尝试过其他类似的方法.toString或类型转换为String值.但是对我没有用.
请告诉我如何将DataSet值转换为String?
java string apache-spark apache-spark-sql apache-spark-dataset
我正在研究一个代表事件流的数据集(比如从网站上发布的跟踪事件).所有活动都有时间戳.我们经常遇到的一个用例是尝试找到给定字段的第一个非空值.因此,举例来说,最让我们感受到的是:
val eventsDf = spark.read.json(jsonEventsPath)
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
eventsDf("message.visit.id").alias("visitId"),
eventsDf("message.property.user_id").alias("userId"),
eventsDf("message.property.timestamp"),
...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
Run Code Online (Sandbox Code Playgroud)
上述代码的问题first在于无法保证馈送到该聚合函数的数据的顺序.我希望它被排序timestamp以确保它是时间戳的第一个非null userId而不是任何随机的非null userId.
有没有办法在分组中定义排序?
使用Spark 2.10
BTW,在SPARK DataFrame中为Spark 2.10建议的方式:选择每个组的第一行是在分组之前进行排序 - 这不起作用.例如,以下代码:
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
OrderedKeyValue("a", null, 1),
OrderedKeyValue("a", null, 2),
OrderedKeyValue("a", "x", 3),
OrderedKeyValue("a", "y", 4),
OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
Run Code Online (Sandbox Code Playgroud)
有时会返回Array([a,y]),有时Array([a,x])
使用Dataset API时是否仍需要Kryo序列化?
因为数据集使用编码器进行序列化和反序列化: