我在使用Spark的缓存机制时遇到了内存管理问题。我目前正在将Encoders 与 Kryo 结合使用,想知道切换到 beans 是否可以帮助我减少缓存数据集的大小。
基本上,在使用 s 时,使用 beans 相对于 Kryo 序列化有哪些优缺点Encoder?有任何性能改进吗?Dataset除了使用 SER 选项进行缓存之外,还有其他方法可以压缩缓存吗?
作为记录,我发现了一个类似的主题来处理两者之间的比较。然而,它没有详细讨论这个比较。
java memory-management apache-spark apache-spark-dataset apache-spark-encoders
在阅读了几篇关于Spark的DataSet的精彩文章(本,本和本文)后,我完成了下一个DataSet相对于RDD的性能优势:
问题:
RDD[Person].DataSet是否具有高级典型化?IN_MEMORY_ONLY持久性策略时,情况怎么样?无论如何,DataSet会将所有内容序列化吗?它会比RDD有任何性能优势吗?我有一个名为数组类型列的表,writer其值为array[value1, value2],array[value2, value3]....等.
我正在做的self join是获得在数组之间具有共同值的结果.我试过了:
sqlContext.sql("SELECT R2.writer FROM table R1 JOIN table R2 ON R1.id != R2.id WHERE ARRAY_INTERSECTION(R1.writer, R2.writer)[0] is not null ")
Run Code Online (Sandbox Code Playgroud)
和
sqlContext.sql("SELECT R2.writer FROM table R1 JOIN table R2 ON R1.id != R2.id WHERE ARRAY_INTERSECT(R1.writer, R2.writer)[0] is not null ")
Run Code Online (Sandbox Code Playgroud)
但有同样的例外:
线程"main"中的异常org.apache.spark.sql.AnalysisException:未定义的函数:'ARRAY_INTERSECT'.此函数既不是已注册的临时函数,也不是在数据库'default'中注册的永久函数.第1行pos 80
可能Spark SQL不支持ARRAY_INTERSECTION和ARRAY_INTERSECT.我怎样才能实现我的目标Spark SQL?
hiveql apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
我正在使用spark会话读取文件,然后拆分单词并计算单词的迭代次数.我需要以desc顺序显示数据
SparkSession sparkSession = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local")
.getOrCreate();
JavaRDD<Word> textFile = sparkSession
.read()
.textFile("/Users/myname/Documents/README.txt")
.javaRDD()
.flatMap(s -> Arrays.asList(s.split("[\\s.]")).iterator())
.map(w -> {
Word word = new Word();
word.setWord(w.replace(",", ""));
return word;
});
Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();
Run Code Online (Sandbox Code Playgroud)
当我使用org.apache.spark.sql.functions.col("count")它时工作正常但无法按照https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc( java.lang.String中)
df.sort(asc("dept"), desc("age"))
还有如何在Spark SQL中按降序排序列? 没用.我想这是斯卡拉.Java中的等价物是什么?
使用Scala Spark,如何使用类型化的数据集API舍入聚合列?
另外,如何通过groupby操作保留数据集的类型?
这是我目前拥有的:
case class MyRow(
k1: String,
k2: String,
c1: Double,
c2: Double
)
def groupTyped(ds: Dataset[MyRow]): Dataset[MyRow] = {
import org.apache.spark.sql.expressions.scalalang.typed._
ds.groupByKey(row => (row.k1, row.k2))
.agg(
avg(_.c1),
avg(_.c2)
)
.map(r => MyRow(r._1._1, r._1._2, r._2, r._3))
}
Run Code Online (Sandbox Code Playgroud)
avg(_.c1),则会round(avg(_.c1))出现类型错误。四舍五入的正确方法是什么?.map(...)行感觉不对-有没有更优雅的方法来保留我的数据集的类型?谢谢!
我想从边的数据集中获取所有链接,边的数据源包含在所有现有节点的数据集中。
边栏:dst | src | 类型 (所有字符串)
节点列:id | pageid | (所有字符串)
我这样做是通过从数据集中检索列表并使用contains()方法。
List<String> allNodeList = allNodes.javaRDD().map(r -> r.getString(0)).collect();
Dataset<Row> allLinks = dfEdges.filter("type = 'link'").filter(r -> allNodeList.contains(r.getString(1)));
Run Code Online (Sandbox Code Playgroud)
但是,现在我想消除那段额外的代码,并使用一种更原生的方式。我的方法是使用count,但是由于NotSerializableException,这似乎不起作用。
Dataset<Row> allLinks = dfEdges.filter("type = 'link'").filter(r -> (dfNodes.filter("id="+r.getString(1)).count()>0));
Run Code Online (Sandbox Code Playgroud)
有没有简单的方法可以解决Java中的问题?我已经在scala中看到了“存在于”或类似内容,但不知道如何在Java中简单地解决它。
我有一个包含许多字段的JSON文件.我在java中使用spark的Dataset读取文件.
Spark版本2.2.0
java jdk 1.8.0_121
下面是代码.
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.master("local")
.getOrCreate();
Dataset<Row> df = spark.read().json("jsonfile.json");
Run Code Online (Sandbox Code Playgroud)
我想使用带有自定义UDF的withColumn函数来添加新列.
UDF1 someudf = new UDF1<Row,String>(){
public String call(Row fin) throws Exception{
String some_str = fin.getAs("String");
return some_str;
}
};
spark.udf().register( "some_udf", someudf, DataTypes.StringType );
df.withColumn( "procs", callUDF( "some_udf", col("columnx") ) ).show();
Run Code Online (Sandbox Code Playgroud)
运行上面的代码时出现转换错误.java.lang.String无法强制转换为org.apache.spark.sql.Row
问题:
1 - 读取行数据集是唯一的选择吗?我可以将df转换为df的字符串.但我无法选择字段.
2 - 尝试但未能定义用户定义的数据类型.我无法使用此自定义UDDatatype注册UDF.我需要用户定义的数据类型吗?
3 - 和主要问题,我如何从String转换为Row?
部分日志复制如下:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.Row
at Risks.readcsv$1.call(readcsv.java:1)
at …Run Code Online (Sandbox Code Playgroud) java user-defined-functions apache-spark apache-spark-sql apache-spark-dataset
我正在尝试将UUID列添加到我的数据集中。
getDataset(Transaction.class)).withColumn("uniqueId", functions.lit(UUID.randomUUID().toString())).show(false);
Run Code Online (Sandbox Code Playgroud)
但是结果是所有行都具有相同的UUID。我如何使其独特?
+-----------------------------------+
uniqueId |
+----------------+-------+-----------
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
|1abdecf-8303-4a4e-8ad3-89c190957c3b|
----------+----------------+--------+
Run Code Online (Sandbox Code Playgroud) 我有这个代码在scala中运行良好:
val schema = StructType(Array(
StructField("field1", StringType, true),
StructField("field2", TimestampType, true),
StructField("field3", DoubleType, true),
StructField("field4", StringType, true),
StructField("field5", StringType, true)
))
val df = spark.read
// some options
.schema(schema)
.load(myEndpoint)
Run Code Online (Sandbox Code Playgroud)
我想在Java中做类似的事情.所以我的代码如下:
final StructType schema = new StructType(new StructField[] {
new StructField("field1", new StringType(), true,new Metadata()),
new StructField("field2", new TimestampType(), true,new Metadata()),
new StructField("field3", new StringType(), true,new Metadata()),
new StructField("field4", new StringType(), true,new Metadata()),
new StructField("field5", new StringType(), true,new Metadata())
});
Dataset<Row> df = spark.read()
// some options
.schema(schema)
.load(myEndpoint);
Run Code Online (Sandbox Code Playgroud)
但这给了我以下错误: …
Spark具有SQL函数percentile_approx(),与Scala对应的是SQL df.stat.approxQuantile()。
但是,Scala副本不能用于分组数据集,例如df.groupby("foo").stat.approxQuantile(),在此处回答:https : //stackoverflow.com/a/51933027。
但是可以在SQL语法中进行分组和百分位。所以我想知道,是否可以从SQL percentile_approx函数定义UDF 并将其用于分组数据集?