假设我有一个这样的数据结构,其中ts是一些时间戳
case class Record(ts: Long, id: Int, value: Int)
Run Code Online (Sandbox Code Playgroud)
鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
records.keyBy(_.id).reduceByKey{
(x, y) => if(x.ts > y.ts) x else y
}.values
}
Run Code Online (Sandbox Code Playgroud)
同样,这是我对数据集的尝试:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
records.groupByKey(_.id).mapGroups{
case(id, records) => {
records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 - 我意识到我可以使用以下方法进行分组:
records.groupBy($"id")
Run Code Online (Sandbox Code Playgroud)
但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的东西 - 我看到的所有示例聚合似乎都只关注返回一个聚合而不是整行的列.
是否可以使用数据框来实现这一目标?
我有Dataset<Tuple2<String,DeviceData>>并希望将其转化为Iterator<DeviceData>.
下面是我使用collectAsList()方法然后获取的代码Iterator<DeviceData>.
Dataset<Tuple2<String,DeviceData>> ds = ...;
List<Tuple2<String, DeviceData>> listTuple = ds.collectAsList();
ArrayList<DeviceData> myDataList = new ArrayList<DeviceData>();
for(Tuple2<String, DeviceData> tuple : listTuple){
myDataList.add(tuple._2());
}
Iterator<DeviceData> myitr = myDataList.iterator();
Run Code Online (Sandbox Code Playgroud)
我不能使用,collectAsList()因为我的数据很大,会妨碍性能.我查看了数据集API但无法获得任何解决方案.我用谷歌搜索,但找不到任何答案.有人可以指导我吗?如果解决方案在java中会很棒.谢谢.
编辑:
DeviceDataclass是简单的javabean.这是ds的printSchema()输出.
root
|-- value: string (nullable = true)
|-- _2: struct (nullable = true)
| |-- deviceData: string (nullable = true)
| |-- deviceId: string (nullable = true)
| |-- sNo: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud) 我试图遍历数据集来进行一些字符串相似度计算,如Jaro winkler或Cosine Similarity.我将我的数据集转换为行列表,然后遍历for语句,这不是有效的火花方式.所以我期待在Spark中采用更好的方法.
public class sample {
public static void main(String[] args) {
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
SQLContext sqlContext = new SQLContext(sc);
SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();
List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
RowFactory.create("Name","FirstName"));
StructType schema = new StructType(
new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });
Dataset<Row> oldDF = spark.createDataFrame(data, schema);
oldDF.show();
List<Row> rowslist = oldDF.collectAsList();
}
}
Run Code Online (Sandbox Code Playgroud)
我发现了许多我不清楚的JavaRDD示例.数据集的示例将对我有所帮助.
java iterator apache-spark apache-spark-dataset apache-spark-2.0
我在spark中加入两个数据集有点问题,我有这个:
SparkConf conf = new SparkConf()
.setAppName("MyFunnyApp")
.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.debug.maxToStringFields", 150)
.getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile1)
.as(encoderObject1);
Dataset<MyOwnObject2> object2DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile2)
.as(encoderObject2);
Run Code Online (Sandbox Code Playgroud)
我可以打印架构并正确显示它.
//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS =
object1DS.join(object2DS, object1DS.col("column01")
.equalTo(object2DS.col("column01")))
.as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
Run Code Online (Sandbox Code Playgroud)
最后一行无法连接并得到我这个错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed …Run Code Online (Sandbox Code Playgroud) java apache-spark apache-spark-dataset apache-spark-encoders
我创建了一个Spark Dataset[Long]:
scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)
当我运行ds.count它给我结果0.2s(在4 Core 8GB机器上).此外,它创建的DAG如下:
但是,当我跑的ds.rdd.count时候给了我结果4s(同一台机器).但它创建的DAG如下:
所以,我的怀疑是:
ds.rdd.count只创造一个阶段而ds.count创造两个阶段?ds.rdd.count只有一个阶段时,为什么它比ds.count两个阶段慢?performance scala apache-spark apache-spark-sql apache-spark-dataset
我有一个给定的DataSet:
+-------------------+--------------------+
| date| products|
+-------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|
|2017-09-21 22:00:00|[361, 361, 361, 3...|
|2017-09-28 22:00:00|[360, 361, 361, 3...|
Run Code Online (Sandbox Code Playgroud)
产品列是包含可能重复项的字符串数组。
我想删除此重复项(在一排之内)
我所做的基本上是编写像这样的UDF函数
val removeDuplicates: WrappedArray[String] => WrappedArray[String] = _.distinct
val udfremoveDuplicates = udf(removeDuplicates)
Run Code Online (Sandbox Code Playgroud)
这个解决方案给了我适当的结果:
+-------------------+--------------------+--------------------+
| date| products| rm_duplicates|
+-------------------+--------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|[361, 362, 363, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|[361, 362, 363, 3...|
Run Code Online (Sandbox Code Playgroud)
我的问题是:
Spark是否提供更好/更有效的方式来获得此结果?
我当时正在考虑使用地图-但是如何获取所需的列作为列表,以便能够像在removeDuplicates lambda中那样使用'distinct'方法?
编辑:我用java标记标记了此主题,因为与我所用的语言(scala或java)无关,我都会收到answear :) Edit2:typos
java scala apache-spark apache-spark-sql apache-spark-dataset
假设你有一个CSV有三列:item,username,和userid。使用Spark的Dataset API读取以下内容非常简单:
case class Flat(item: String, username: String, userid: String)
ds = sparkSession.read.csv("path/to/data").toDF("item", "username", "userid").as[Flat]
Run Code Online (Sandbox Code Playgroud)
然后ds将为类型Dataset[Flat]。
但是,假设你希望你的数据有形式Dataset[Nested],其中Nested由下式给出:
case class User(name: String, id: String)
case class Nested(item: String, user: User)
Run Code Online (Sandbox Code Playgroud)
一种实现方法是将数据读入a Dataset[Flat],然后应用a map将其转换为a Dataset[Nested],但实际上,Flatcase类别通常不需要其他任何东西,并且它会使代码不必要地冗长。有什么方法可以跳过中间人并直接构造一个Dataset[Nested]?
我正在尝试从JavaRDd <Book>和JavaRdd <Reviews>生成一个复杂的xml,我如何将这两个结合在一起以在xml之下生成?
<xml>
<library>
<books>
<book>
<author>test</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
</reviews>
</library>
Run Code Online (Sandbox Code Playgroud)
如您所见,有一个父根库,其中包含子书和评论。
以下是我如何生成Book and Review Dataframe
DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);
Run Code Online (Sandbox Code Playgroud)
我知道要生成xml,而我的疑问尤其是对于拥有Library rootTag以及将Books and Reviews作为其子元素。
我正在使用Java。但是如果您可以指出正确的内容,则可以编写Scala或Python示例。
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset apache-spark-xml
请考虑以下代码:
case class Person(
personId: Long, name: String, ageGroup: String, gender: String,
relationshipStatus: String, country: String, state: String
)
case class PerPersonPower(personId: Long, power: Double)
val people: Dataset[Person] = ... // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ... // Around 50 million entries.
people.join(powers, "personId")
.groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
.agg(
sum("power").alias("totalPower"),
count("*").alias("personCount")
)
Run Code Online (Sandbox Code Playgroud)
它在具有大约100 GB RAM的群集上执行.但是,群集内存不足.我不知道该怎么做.实际上,people被分区$"personId"和缓存 - people.repartition($"personId").cache().
我有什么想法可以优化这个计算?
该集群是一个普通的Google Dataproc集群---因此它在客户端模式下使用YARN--由14个节点组成,每个节点具有8 GB RAM.
scala apache-spark apache-spark-sql google-cloud-dataproc apache-spark-dataset
这是我正在运行的示例代码.
使用mod列作为分区创建测试镶木地板数据集.
scala> val test = spark.range(0 , 100000000).withColumn("mod", $"id".mod(40))
test: org.apache.spark.sql.DataFrame = [id: bigint, mod: bigint]
scala> test.write.partitionBy("mod").mode("overwrite").parquet("test_pushdown_filter")
Run Code Online (Sandbox Code Playgroud)
之后,我将这些数据作为数据框架读取并在分区列上应用过滤器即mod.
scala> val df = spark.read.parquet("test_pushdown_filter").filter("mod = 5")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, mod: int]
scala> df.queryExecution.executedPlan
res1: org.apache.spark.sql.execution.SparkPlan =
*FileScan parquet [id#16L,mod#17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/kprajapa/WorkSpace/places/test_pushdown_filter], PartitionCount: 1, PartitionFilters: [
isnotnull(mod#17), (mod#17 = 5)], PushedFilters: [], ReadSchema: struct<id:bigint>
Run Code Online (Sandbox Code Playgroud)
你可以在执行计划中看到它只读取1个分区.
但是,如果您将相同的过滤器应用于数据集.它读取所有分区,然后应用过滤器.
scala> case class Test(id: Long, mod: Long)
defined class Test
scala> val ds = spark.read.parquet("test_pushdown_filter").as[Test].filter(_.mod==5)
ds: …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql apache-spark-dataset catalyst-optimizer