我正在与其他用户共享的集群上使用 Spark。所以仅仅根据运行时间来判断我的哪一个代码运行效率更高是不可靠的。因为当我运行更高效的代码时,其他人可能会运行大量数据,并使我的代码执行更长时间。
那么我可以在这里问两个问题吗:
我正在使用joinfunction 来 join 2RDDs并且我尝试groupByKey()在 using 之前使用join,如下所示:
rdd1.groupByKey().join(rdd2)
Run Code Online (Sandbox Code Playgroud)
似乎花了更长的时间,但是我记得当我使用 Hadoop Hive 时,group by 让我的查询运行得更快。由于 Spark 使用惰性求值,我想知道groupByKeybefore是否join会让事情变得更快
我注意到Spark有一个SQL模块,到目前为止我真的没有时间尝试它,但是我可以问一下SQL模块和RDD SQL类似功能之间有什么区别吗?
我想知道,rdd.unpersist()spark RDD 上的操作成本有多大?存储级别设置是否会影响此操作的性能?任何基准(结果/技术)都会非常有帮助。
为什么是:
import spark.implicits._
val content = Seq(("2019", "09", "11","17","16","54","762000000")).toDF("year", "month", "day", "hour", "minute", "second", "nano")
content.printSchema
content.show
content.withColumn("event_time_utc", to_timestamp(concat('year, 'month, 'day, 'hour, 'minute, 'second), "yyyyMMddHHmmss"))
.withColumn("event_time_utc_millis", to_timestamp(concat('year, 'month, 'day, 'hour, 'minute, 'second, substring('nano, 0, 3)), "yyyyMMddHHmmssSSS"))
.select('year, 'month, 'day, 'hour, 'minute, 'second, 'nano,substring('nano, 0, 3), 'event_time_utc, 'event_time_utc_millis)
.show
Run Code Online (Sandbox Code Playgroud)
缺少毫秒?
+----+-----+---+----+------+------+---------+---------------------+-------------------+---------------------+
|year|month|day|hour|minute|second| nano|substring(nano, 0, 3)| event_time_utc|event_time_utc_millis|
+----+-----+---+----+------+------+---------+---------------------+-------------------+---------------------+
|2019| 09| 11| 17| 16| 54|762000000| 762|2019-09-11 17:16:54| 2019-09-11 17:16:54|
+----+-----+---+----+------+------+---------+---------------------+-------------------+---------------------+
Run Code Online (Sandbox Code Playgroud)
对于格式字符串:如果我没记错的话yyyyMMddHHmmssSSS,它应该包括毫秒SSS。
timestamp milliseconds format-string apache-spark apache-spark-sql
如果我们将使用 SQL 查询来实现每个部门的第二大值的输出,那么我现在有一个表,department并且value现在可用,那么我们可以这样写:
select * from (SELECT department, asset_value, DENSE_RANK() over (partition by DEPARTMENT order by ASSET_VALUE) as x from [User1].[dbo].[UsersRecord]) y where x = 2;
Run Code Online (Sandbox Code Playgroud)
但是,在 Apache Spark 中,如果我们有不使用 SparkSQL 的限制,并且必须仅使用 DataFrame 实现输出,那么我们应该如何编写 Scala 代码?
我通过文档的帮助从我的角度进行了尝试,但无法弄清楚。
我的代码:
package com.tg.testpack1
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
object class2
{
def main(args: Array[String])
{
val sparksessionobject = SparkSession.builder()
.master("local")
.config("spark.sql.warehouse.dir", "C:/Users/ox/spark/spark/spark-warehouse")
.getOrCreate()
sparksessionobject.conf.set("spark.sql.shuffle.partitions", 4)
sparksessionobject.conf.set("spark.executor.memory", "2g")
import sparksessionobject.sqlContext.implicits._
val …Run Code Online (Sandbox Code Playgroud) 我有一个按以下方式创建的 DataFrame。
val someDF = Seq((8, "bat"),(64, "mouse"),(-27, "horse")).toDF("number", "word")
someDF.printSchema
root
|-- number: integer (nullable = false)
|-- word: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
使用 SQL API,可以通过创建临时表并运行插入查询向其中插入一行。有什么方法可以使用 DataFrame API 的方法追加/添加新行?
我有一个十进制和字符串类型的数据框。我想将所有十进制列转换为 double 而不命名它们。我试过这个没有成功。有点新的火花。
>df.printSchema
root
|-- var1: decimal(38,10) (nullable = true)
|-- var2: decimal(38,10) (nullable = true)
|-- var3: decimal(38,10) (nullable = true)
…
150 more decimal and string columns
Run Code Online (Sandbox Code Playgroud)
我尝试:
import org.apache.spark.sql.types._
val cols = df.columns.map(x => {
if (x.dataType == DecimalType(38,0)) col(x).cast(DoubleType)
else col(x)
})
Run Code Online (Sandbox Code Playgroud)
我得到
<console>:30: error: value dataType is not a member of String
if (x.dataType == DecimalType(38,0)) col(x).cast(DoubleType)
Run Code Online (Sandbox Code Playgroud) 我有一个如下所示的数据框:
+----------+--------------------------------+
| Index | flagArray |
+----------+--------------------------------+
| 1 | ['A','S','A','E','Z','S','S'] |
+----------+--------------------------------+
| 2 | ['A','Z','Z','E','Z','S','S'] |
+--------- +--------------------------------+
Run Code Online (Sandbox Code Playgroud)
我想用其相应的数值来表示数组元素。
A - 0
F - 1
S - 2
E - 3
Z - 4
Run Code Online (Sandbox Code Playgroud)
所以我的输出数据帧应该看起来像
+----------+--------------------------------+--------------------------------+
| Index | flagArray | finalArray |
+----------+--------------------------------+--------------------------------+
| 1 | ['A','S','A','E','Z','S','S'] | [0, 2, 0, 3, 4, 2, 2] |
+----------+--------------------------------+--------------------------------+
| 2 | ['A','Z','Z','E','Z','S','S'] | [0, 4, 4, 3, 4, 2, 2] |
+--------- +--------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)
我在 pyspark 中编写了一个 udf,我通过编写一些 …
python-3.x apache-spark apache-spark-sql pyspark pyspark-dataframes
我有一个只有列category和 A列的数据框,如下所示。我想填充 B 列,以便它比较 A 的当前值和 B 的先前值并存储每个类别的最大值。尝试使用 Windows 函数、滞后、类别最大值等,但我面临的最大挑战是如何在比较两个值时记住较早的最大值。
+---+--------+--+--+
id | category | A | B |
+---+--------+--+--+
1 Fruit 1 1
2 Fruit 5 5
3 Fruit 3 5
4 Fruit 4 5
1 Dessert 4 4
2 Dessert 2 4
1 Veggies 11 11
2 Veggies 7 11
3 Veggies 12 12
4 Veggies 3 12
---+------+---+----+-
Run Code Online (Sandbox Code Playgroud) 我使用的是使用 hadoop-2.6.5.jar 版本的 spark-sql-2.4.1v。我需要先将数据保存在 hdfs 上,然后再转移到 cassandra。因此,我试图将数据保存在 hdfs 上,如下所示:
String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(hdfsPath, resultDs );
});
public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
df.write()
.format("parquet")
.mode("append")
.save(parquet_file);
logger.info(" Saved parquet file : " + parquet_file + "successfully");
}
Run Code Online (Sandbox Code Playgroud)
当我在集群上运行我的工作时,它无法抛出此错误:
java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet …Run Code Online (Sandbox Code Playgroud) 给出以下示例:
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
val testUdf: UserDefinedFunction = udf((a: String, b: String, c: Int) => {
val out = s"test1: $a $b $c"
println(out)
out
})
val testUdf2: UserDefinedFunction = udf((a: String, b: String, c: String) => {
val out = s"test2: $a $b $c"
println(out)
out
})
Seq(("hello", "world", null))
.toDF("a", "b", "c")
.withColumn("c", $"c" cast "Int")
.withColumn("test1", testUdf($"a", $"b", $"c"))
.withColumn("test2", testUdf2($"a", $"b", $"c"))
.show
Run Code Online (Sandbox Code Playgroud)
testUdf似乎没有被调用。没有错误,没有警告,它只是返回 null。
有没有办法检测这些静默故障?另外,这里发生了什么?
火花 2.4.4 斯卡拉 2.11
apache-spark ×10
apache-spark-sql ×10
scala ×4
pyspark ×3
hadoop ×1
hadoop2 ×1
hdfs ×1
milliseconds ×1
python-3.x ×1
rdd ×1
timestamp ×1