标签: apache-spark-dataset

Spark 编码器:何时使用 beans()

我在使用Spark的缓存机制时遇到了内存管理问题。我目前正在将Encoders 与 Kryo 结合使用,想知道切换到 beans 是否可以帮助我减少缓存数据集的大小。

基本上,在使用 s 时,使用 beans 相对于 Kryo 序列化有哪些优缺点Encoder?有任何性能改进吗?Dataset除了使用 SER 选项进行缓存之外,还有其他方法可以压缩缓存吗?

作为记录,我发现了一个类似的主题来处理两者之间的比较。然而,它没有详细讨论这个比较。

java memory-management apache-spark apache-spark-dataset apache-spark-encoders

4
推荐指数
1
解决办法
3079
查看次数

DataSet相对于RDD的性能优势

在阅读了几篇关于Spark的DataSet的精彩文章(,本文)后,我完成了下一个DataSet相对于RDD的性能优势:

  1. 逻辑和物理计划优化;
  2. 严格的典型化;
  3. 矢量化操作;
  4. 低级内存管理.

问题:

  1. Spark的RDD还可以构建物理计划,并可以在同一阶段组合/优化多个转换.那么DataSet相对于RDD有什么好处呢?
  2. 第一个链接可以看到一个例子RDD[Person].DataSet是否具有高级典型化?
  3. "矢量化操作"是什么意思?
  4. 据我了解,DataSet的低内存管理=高级序列化.这意味着可序列化对象的堆外存储,您可以在其中只读取对象的一个​​字段而不进行反序列化.但是当你有IN_MEMORY_ONLY持久性策略时,情况怎么样?无论如何,DataSet会将所有内容序列化吗?它会比RDD有任何性能优势吗?

apache-spark rdd apache-spark-dataset

3
推荐指数
1
解决办法
531
查看次数

Spark SQL中的数组交集

我有一个名为数组类型列的表,writer其值为array[value1, value2],array[value2, value3]....等.

我正在做的self join是获得在数组之间具有共同值的结果.我试过了:

sqlContext.sql("SELECT R2.writer FROM table R1 JOIN table R2 ON R1.id != R2.id WHERE ARRAY_INTERSECTION(R1.writer, R2.writer)[0] is not null ")
Run Code Online (Sandbox Code Playgroud)

sqlContext.sql("SELECT R2.writer FROM table R1 JOIN table R2 ON R1.id != R2.id WHERE ARRAY_INTERSECT(R1.writer, R2.writer)[0] is not null ")
Run Code Online (Sandbox Code Playgroud)

但有同样的例外:

线程"main"中的异常org.apache.spark.sql.AnalysisException:未定义的函数:'ARRAY_INTERSECT'.此函数既不是已注册的临时函数,也不是在数据库'default'中注册的永久函数.第1行pos 80

可能Spark SQL不支持ARRAY_INTERSECTIONARRAY_INTERSECT.我怎样才能实现我的目标Spark SQL

hiveql apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

3
推荐指数
2
解决办法
4020
查看次数

如何使用java api在Apache Spark Dataset中使用desc进行排序?

我正在使用spark会话读取文件,然后拆分单词并计算单词的迭代次数.我需要以desc顺序显示数据

SparkSession sparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example")
            .config("spark.master", "local")
            .getOrCreate();

JavaRDD<Word> textFile = sparkSession
            .read()
            .textFile("/Users/myname/Documents/README.txt")
            .javaRDD()
            .flatMap(s -> Arrays.asList(s.split("[\\s.]")).iterator())
            .map(w -> {
                Word word = new Word();
                word.setWord(w.replace(",", ""));
                return word;
            });

    Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
    df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();
Run Code Online (Sandbox Code Playgroud)

当我使用org.apache.spark.sql.functions.col("count")它时工作正常但无法按照https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc( java.lang.String中)

df.sort(asc("dept"), desc("age"))

还有如何在Spark SQL中按降序排序列? 没用.我想这是斯卡拉.Java中的等价物是什么?

java apache-spark apache-spark-sql apache-spark-dataset

3
推荐指数
1
解决办法
5732
查看次数

如何在Spark数据集中的列取整?

使用Scala Spark,如何使用类型化的数据集API舍入聚合列?

另外,如何通过groupby操作保留数据集的类型?

这是我目前拥有的:

case class MyRow(
  k1: String,
  k2: String,
  c1: Double,
  c2: Double
)

def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(row => (row.k1, row.k2))
  .agg(
    avg(_.c1),
    avg(_.c2)
  )
  .map(r => MyRow(r._1._1, r._1._2, r._2, r._3))
}
Run Code Online (Sandbox Code Playgroud)
  1. 如果我将替换为avg(_.c1),则会round(avg(_.c1))出现类型错误。四舍五入的正确方法是什么?
  2. .map(...)行感觉不对-有没有更优雅的方法来保留我的数据集的类型?

谢谢!

scala apache-spark apache-spark-sql apache-spark-dataset

3
推荐指数
2
解决办法
3837
查看次数

Spark数据集:过滤值是否包含在其他数据集中

我想从边的数据集中获取所有链接,边的数据源包含在所有现有节点的数据集中。

边栏:dst | src | 类型 (所有字符串)

节点列:id | pageid | (所有字符串)

我这样做是通过从数据集中检索列表并使用contains()方法。

List<String> allNodeList = allNodes.javaRDD().map(r -> r.getString(0)).collect();
Dataset<Row> allLinks = dfEdges.filter("type = 'link'").filter(r -> allNodeList.contains(r.getString(1)));
Run Code Online (Sandbox Code Playgroud)

但是,现在我想消除那段额外的代码,并使用一种更原生的方式。我的方法是使用count,但是由于NotSerializableException,这似乎不起作用。

Dataset<Row> allLinks = dfEdges.filter("type = 'link'").filter(r -> (dfNodes.filter("id="+r.getString(1)).count()>0));
Run Code Online (Sandbox Code Playgroud)

有没有简单的方法可以解决Java中的问题?我已经在scala中看到了“存在于”或类似内容,但不知道如何在Java中简单地解决它。

java apache-spark spark-dataframe apache-spark-dataset

3
推荐指数
1
解决办法
5812
查看次数

在Spark数据集<Row>中使用custome UDF withColumn; java.lang.String无法强制转换为org.apache.spark.sql.Row

我有一个包含许多字段的JSON文件.我在java中使用spark的Dataset读取文件.

  • Spark版本2.2.0

  • java jdk 1.8.0_121

下面是代码.

SparkSession spark = SparkSession
              .builder()
              .appName("Java Spark SQL basic example")
              .config("spark.some.config.option", "some-value")
              .master("local")
              .getOrCreate();

Dataset<Row> df = spark.read().json("jsonfile.json");
Run Code Online (Sandbox Code Playgroud)

我想使用带有自定义UDF的withColumn函数来添加新列.

UDF1 someudf = new UDF1<Row,String>(){
        public String call(Row fin) throws Exception{
            String some_str = fin.getAs("String");
            return some_str;
        }
    };
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();
Run Code Online (Sandbox Code Playgroud)

运行上面的代码时出现转换错误.java.lang.String无法强制转换为org.apache.spark.sql.Row

问题:

1 - 读取行数据集是唯一的选择吗?我可以将df转换为df的字符串.但我无法选择字段.

2 - 尝试但未能定义用户定义的数据类型.我无法使用此自定义UDDatatype注册UDF.我需要用户定义的数据类型吗?

3 - 和主要问题,我如何从String转换为Row?

部分日志复制如下:

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
    at Risks.readcsv$1.call(readcsv.java:1)
    at …
Run Code Online (Sandbox Code Playgroud)

java user-defined-functions apache-spark apache-spark-sql apache-spark-dataset

3
推荐指数
1
解决办法
6348
查看次数

将UUID添加到Spark数据集

我正在尝试将UUID列添加到我的数据集中。

getDataset(Transaction.class)).withColumn("uniqueId", functions.lit(UUID.randomUUID().toString())).show(false);
Run Code Online (Sandbox Code Playgroud)

但是结果是所有行都具有相同的UUID。我如何使其独特?

+-----------------------------------+
uniqueId                            |
+----------------+-------+-----------
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
----------+----------------+--------+
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-csv apache-spark-dataset

3
推荐指数
1
解决办法
4025
查看次数

Spark java:使用给定的模式创建新的数据集

我有这个代码在scala中运行良好:

val schema = StructType(Array(
        StructField("field1", StringType, true),
        StructField("field2", TimestampType, true),
        StructField("field3", DoubleType, true),
        StructField("field4", StringType, true),
        StructField("field5", StringType, true)
    ))

val df = spark.read
    // some options
    .schema(schema)
    .load(myEndpoint)
Run Code Online (Sandbox Code Playgroud)

我想在Java中做类似的事情.所以我的代码如下:

final StructType schema = new StructType(new StructField[] {
     new StructField("field1",  new StringType(), true,new Metadata()),
     new StructField("field2", new TimestampType(), true,new Metadata()),
     new StructField("field3", new StringType(), true,new Metadata()),
     new StructField("field4", new StringType(), true,new Metadata()),
     new StructField("field5", new StringType(), true,new Metadata())
});

Dataset<Row> df = spark.read()
    // some options
    .schema(schema)
    .load(myEndpoint);
Run Code Online (Sandbox Code Playgroud)

但这给了我以下错误: …

java scala apache-spark apache-spark-dataset

3
推荐指数
1
解决办法
3110
查看次数

如何按组使用roximateQuantile?

Spark具有SQL函数percentile_approx(),与Scala对应的是SQL df.stat.approxQuantile()

但是,Scala副本不能用于分组数据集,例如df.groupby("foo").stat.approxQuantile(),在此处回答:https : //stackoverflow.com/a/51933027

但是可以在SQL语法中进行分组和百分位。所以我想知道,是否可以从SQL percentile_approx函数定义UDF 并将其用于分组数据集?

apache-spark apache-spark-sql apache-spark-dataset

3
推荐指数
1
解决办法
2192
查看次数