标签: apache-spark-dataset

Spark Dataframes-按键减少

假设我有一个这样的数据结构,其中ts是一些时间戳

case class Record(ts: Long, id: Int, value: Int)
Run Code Online (Sandbox Code Playgroud)

鉴于大量这些记录,我希望最终得到每个id具有最高时间戳的记录.使用RDD api我认为以下代码完成了工作:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}
Run Code Online (Sandbox Code Playgroud)

同样,这是我对数据集的尝试:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我正在尝试研究如何使用数据框来实现类似的东西,但无济于事 - 我意识到我可以使用以下方法进行分组:

records.groupBy($"id")
Run Code Online (Sandbox Code Playgroud)

但是这给了我一个RelationGroupedDataSet,我不清楚我需要编写什么聚合函数来实现我想要的东西 - 我看到的所有示例聚合似乎都只关注返回一个聚合而不是整行的列.

是否可以使用数据框来实现这一目标?

scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
1万
查看次数

如何将数据集<Tuple2 <String,DeviceData >>转换为Iterator <DeviceData>

我有Dataset<Tuple2<String,DeviceData>>并希望将其转化为Iterator<DeviceData>.

下面是我使用collectAsList()方法然后获取的代码Iterator<DeviceData>.

Dataset<Tuple2<String,DeviceData>> ds = ...;
List<Tuple2<String, DeviceData>> listTuple = ds.collectAsList();

ArrayList<DeviceData> myDataList = new ArrayList<DeviceData>();
for(Tuple2<String, DeviceData> tuple : listTuple){
    myDataList.add(tuple._2());
}

Iterator<DeviceData> myitr = myDataList.iterator();
Run Code Online (Sandbox Code Playgroud)

我不能使用,collectAsList()因为我的数据很大,会妨碍性能.我查看了数据集API但无法获得任何解决方案.我用谷歌搜索,但找不到任何答案.有人可以指导我吗?如果解决方案在java中会很棒.谢谢.

编辑:

DeviceDataclass是简单的javabean.这是ds的printSchema()输出.

root
 |-- value: string (nullable = true)
 |-- _2: struct (nullable = true)
 |    |-- deviceData: string (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- sNo: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-dataset apache-spark-2.0

5
推荐指数
1
解决办法
269
查看次数

如何在Spark Java中遍历/迭代数据集?

我试图遍历数据集来进行一些字符串相似度计算,如Jaro winkler或Cosine Similarity.我将我的数据集转换为行列表,然后遍历for语句,这不是有效的火花方式.所以我期待在Spark中采用更好的方法.

public class sample {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

        List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
                RowFactory.create("Name","FirstName"));
        StructType schema = new StructType(
                new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
                        new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });

        Dataset<Row> oldDF = spark.createDataFrame(data, schema);
        oldDF.show();
        List<Row> rowslist = oldDF.collectAsList(); 
    }
}
Run Code Online (Sandbox Code Playgroud)

我发现了许多我不清楚的JavaRDD示例.数据集的示例将对我有所帮助.

java iterator apache-spark apache-spark-dataset apache-spark-2.0

5
推荐指数
1
解决办法
2万
查看次数

如何使用java对象将两个spark数据集连接到一个?

我在spark中加入两个数据集有点问题,我有这个:

SparkConf conf = new SparkConf()
    .setAppName("MyFunnyApp")
    .setMaster("local[*]");

SparkSession spark = SparkSession
    .builder()
    .config(conf)
    .config("spark.debug.maxToStringFields", 150)
    .getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile1)
    .as(encoderObject1);

Dataset<MyOwnObject2> object2DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile2)
    .as(encoderObject2);
Run Code Online (Sandbox Code Playgroud)

我可以打印架构并正确显示它.

//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01")
    .equalTo(object2DS.col("column01")))
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
Run Code Online (Sandbox Code Playgroud)

最后一行无法连接并得到我这个错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed …
Run Code Online (Sandbox Code Playgroud)

java apache-spark apache-spark-dataset apache-spark-encoders

5
推荐指数
1
解决办法
2018
查看次数

为什么dataset.count()比rdd.count()更快?

我创建了一个Spark Dataset[Long]:

scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)

当我运行ds.count它给我结果0.2s(在4 Core 8GB机器上).此外,它创建的DAG如下:

在此输入图像描述

但是,当我跑的ds.rdd.count时候给了我结果4s(同一台机器).但它创建的DAG如下:

在此输入图像描述

所以,我的怀疑是:

  1. 为什么ds.rdd.count只创造一个阶段而ds.count创造两个阶段?
  2. 此外,当ds.rdd.count只有一个阶段时,为什么它比ds.count两个阶段慢?

performance scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
4141
查看次数

删除Spark数组列中的重复项

我有一个给定的DataSet:

+-------------------+--------------------+
|               date|            products|
+-------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|
|2017-09-21 22:00:00|[361, 361, 361, 3...|
|2017-09-28 22:00:00|[360, 361, 361, 3...|
Run Code Online (Sandbox Code Playgroud)

产品列是包含可能重复项的字符串数组。

我想删除此重复项(在一排之内)

我所做的基本上是编写像这样的UDF函数

 val removeDuplicates: WrappedArray[String] => WrappedArray[String] = _.distinct
 val udfremoveDuplicates = udf(removeDuplicates)
Run Code Online (Sandbox Code Playgroud)

这个解决方案给了我适当的结果:

+-------------------+--------------------+--------------------+
|               date|            products|       rm_duplicates|
+-------------------+--------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|[361, 362, 363, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|[361, 362, 363, 3...|
Run Code Online (Sandbox Code Playgroud)

我的问题是:

  1. Spark是否提供更好/更有效的方式来获得此结果?

  2. 我当时正在考虑使用地图-但是如何获取所需的列作为列表,以便能够像在removeDuplicates lambda中那样使用'distinct'方法?

编辑:我用java标记标记了此主题,因为与我所用的语言(scala或java)无关,我都会收到answear :) Edit2:typos

java scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
2084
查看次数

Spark可以直接将数据读取到嵌套的case类中吗?

假设你有一个CSV有三列:itemusername,和userid。使用Spark的Dataset API读取以下内容非常简单:

case class Flat(item: String, username: String, userid: String)
ds = sparkSession.read.csv("path/to/data").toDF("item", "username", "userid").as[Flat]
Run Code Online (Sandbox Code Playgroud)

然后ds将为类型Dataset[Flat]

但是,假设你希望你的数据有形式Dataset[Nested],其中Nested由下式给出:

case class User(name: String, id: String)
case class Nested(item: String, user: User)
Run Code Online (Sandbox Code Playgroud)

一种实现方法是将数据读入a Dataset[Flat],然后应用a map将其转换为a Dataset[Nested],但实际上,Flatcase类别通常不需要其他任何东西,并且它会使代码不必要地冗长。有什么方法可以跳过中间人并直接构造一个Dataset[Nested]

scala apache-spark apache-spark-dataset

5
推荐指数
1
解决办法
1484
查看次数

如何使用Spark-Xml生成复杂的XML

我正在尝试从JavaRDd <Book>和JavaRdd <Reviews>生成一个复杂的xml,我如何将这两个结合在一起以在xml之下生成?

<xml>
<library>
    <books>
        <book>
            <author>test</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
    </reviews>
</library>
Run Code Online (Sandbox Code Playgroud)

如您所见,有一个父根库,其中包含子书和评论。

以下是我如何生成Book and Review Dataframe

DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);
Run Code Online (Sandbox Code Playgroud)

我知道要生成xml,而我的疑问尤其是对于拥有Library rootTag以及将Books and Reviews作为其子元素。

我正在使用Java。但是如果您可以指出正确的内容,则可以编写Scala或Python示例。

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset apache-spark-xml

5
推荐指数
1
解决办法
1252
查看次数

在Spark SQL中聚合大型数据集

请考虑以下代码:

case class Person(
  personId: Long, name: String, ageGroup: String, gender: String,
  relationshipStatus: String, country: String, state: String
)

case class PerPersonPower(personId: Long, power: Double)

val people: Dataset[Person] = ...          // Around 50 million entries.
val powers: Dataset[PerPersonPower] = ...  // Around 50 million entries.

people.join(powers, "personId")
  .groupBy("ageGroup", "gender", "relationshipStatus", "country", "state")
  .agg(
    sum("power").alias("totalPower"),
    count("*").alias("personCount")
  )
Run Code Online (Sandbox Code Playgroud)

它在具有大约100 GB RAM的群集上执行.但是,群集内存不足.我不知道该怎么做.实际上,people被分区$"personId"和缓存 - people.repartition($"personId").cache().

我有什么想法可以优化这个计算?

该集群是一个普通的Google Dataproc集群---因此它在客户端模式下使用YARN--由14个节点组成,每个节点具有8 GB RAM.

scala apache-spark apache-spark-sql google-cloud-dataproc apache-spark-dataset

5
推荐指数
1
解决办法
536
查看次数

Parquet过滤器下推功能不适用于Spark Dataset API

这是我正在运行的示例代码.

使用mod列作为分区创建测试镶木地板数据集.

scala> val test = spark.range(0 , 100000000).withColumn("mod", $"id".mod(40))
test: org.apache.spark.sql.DataFrame = [id: bigint, mod: bigint]

scala> test.write.partitionBy("mod").mode("overwrite").parquet("test_pushdown_filter")
Run Code Online (Sandbox Code Playgroud)

之后,我将这些数据作为数据框架读取并在分区列上应用过滤器即mod.

scala> val df = spark.read.parquet("test_pushdown_filter").filter("mod = 5")
df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: bigint, mod: int]

scala> df.queryExecution.executedPlan
res1: org.apache.spark.sql.execution.SparkPlan =
*FileScan parquet [id#16L,mod#17] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/C:/Users/kprajapa/WorkSpace/places/test_pushdown_filter], PartitionCount: 1, PartitionFilters: [
isnotnull(mod#17), (mod#17 = 5)], PushedFilters: [], ReadSchema: struct<id:bigint>
Run Code Online (Sandbox Code Playgroud)

你可以在执行计划中看到它只读取1个分区.

但是,如果您将相同的过滤器应用于数据集.它读取所有分区,然后应用过滤器.

scala> case class Test(id: Long, mod: Long)
defined class Test

scala> val ds = spark.read.parquet("test_pushdown_filter").as[Test].filter(_.mod==5)
ds: …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql apache-spark-dataset catalyst-optimizer

5
推荐指数
0
解决办法
2342
查看次数