我试图通过禁用 BroadcastHashJoin 和 SortMergeJoin 来强制 Spark 使用 ShuffleHashJoin,但 Spark 始终使用 SortMergeJoin。
我使用的是spark版本2.4.3
object ShuffleHashJoin {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder()
.appName("ShuffleHashJoin")
.master("local[*]")
.getOrCreate()
/*
* Disable auto broadcasting of table and SortMergeJoin
*/
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)
import spark.implicits._
val dataset = Seq(
(0, "playing"),
(1, "with"),
(2, "ShuffledHashJoinExec")).toDF("id", "token")
dataset.join(dataset, Seq("id"), "inner").foreach(_ => ())
// infinite loop to keep the program running to check Spark UI at 4040 port.
while (true) {}
Run Code Online (Sandbox Code Playgroud) 我有以下事件表,我想将它们分组为较小的时间段,如下所示。
该表必须分为较小的集合,其中集合的起始行和结束行由 geohash 确定,如果 geohash 相同,则集合保留包含行,直到发现下一个 geohash 不同。
key time_stamp geohash
k1 1 abcdfg
k1 5 abcdfg
k1 7 abcdf1
k1 9 abcdfg
k1 10 abcdf2
k1 12 abcdf2
k1 21 abcdf2
Run Code Online (Sandbox Code Playgroud)
如何使用 Apache Spark SQL 语法生成以下输出
key geohash first_time last_time duration num_events
k1 abcdfg 1 5 4 2
k1 abcdf1 7 7 0 1
k1 abcdfg 9 9 0 1
k1 abcdf2 10 21 11 3
Run Code Online (Sandbox Code Playgroud)
有人可以帮助我实现这一目标吗?
sql window-functions gaps-and-islands apache-spark apache-spark-sql
from pyspark.sql.functions import *
ghj=finalDF.withColumn("temp", explode(split(regexp_replace(to_json(struct(col("sum(P0)"), col("sum(P1)"), col("sum(P2)"), col("sum(P3)"), col("sum(P4)"), col("sum(P5)"))),"""[\{"\}]""",""), ",")))
.withColumn("Priority", split(col("temp"),":")[0])
.withColumn("Count", split(col("temp"),":")[1]).select(col("NAME"), col("SHORT_DESCRIPTION"), col("Priority"), col("Count")).show()
Run Code Online (Sandbox Code Playgroud) 在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图
func(tabdf, epoch_id):
tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)
请任何人帮助我解决这个问题。
spark-streaming apache-spark-sql pyspark spark-structured-streaming
我想对用 Scala 编写的 Spark 应用程序使用 $ 简写。但我有大量的列,我想使用变量来引用它们。同时,我想对保存列名的字符串使用 $ 简写符号。
df.select($"col1") // works
val columnName = "col1"
df.select($columnName) // gives error
Run Code Online (Sandbox Code Playgroud)
如何将变量columnName 与$ 简写一起使用?
我有以下数据集
+----+-----------+
|col1| col2|
+----+-----------+
| 1|val1, val2 |
| 2|val3, val4 |
+----+-----------+
Run Code Online (Sandbox Code Playgroud)
将所有值视为字符串现在我想将其转换为下面的数据集
+----+-----------+
|col1| col2|
+----+-----------+
| 1|val1 |
| 1|val2 |
| 2|val3 |
| 2|val4 |
+----+-----------+
Run Code Online (Sandbox Code Playgroud)
我怎样才能实现这个目标?
我有一个包含用户名(约 1 000 行)的表,称为“潜在用户”,另一个表称为“实际用户”(约 1000 万行)。所有记录都完全由 [az] 字符组成,没有空格。此外,我知道实际用户表中没有潜在用户。
我希望能够根据 Levenshtein 距离,计算 possible_users 中的每一行,actual_users 中最接近的记录是什么。例如:
| potential_users|
|----------------|
| user1 |
| kajd |
| bbbbb |
Run Code Online (Sandbox Code Playgroud)
和
| actual_users |
|--------------|
| kaj |
| bbbbbbb |
| user |
Run Code Online (Sandbox Code Playgroud)
将返回:
| potential_users | actual_users | levenshtein_distance |
|-----------------|--------------|----------------------|
| user1 | user | 1 |
| kajd | kaj | 1 |
| bbbbb | bbbbbbb | 2 |
Run Code Online (Sandbox Code Playgroud)
如果表很短,我可以创建一个交叉联接,计算潜在用户中的每条记录与实际用户中的编辑距离,然后返回具有最低值的记录。然而,在我的例子中,这将创建一个 1 000 x 10 000 000 行的中间表,这有点不切实际。
是否有更干净的方法通过创建交叉连接来执行此类操作?
我正在 Azure Databricks DBR 7.3 LTS、spark 3.0.1、scala 2.12 在 Standard_E4as_v4(32.0 GB 内存、4 个内核、1 DBU)VM 的(20 到 35)个工作人员集群上运行以下代码,并且类型为 Standard_DS5_v2 驱动程序( 56.0 GB 内存、16 核、3 DBU)
目标是处理约 5.5 TB 的数据
我面临以下异常:“org.apache.spark.SparkException:由于阶段失败而中止作业:1165个任务的序列化结果的总大小(4.0 GiB)大于spark.driver.maxResultSize 4.0 GiB”处理1163后57071,正在处理 148.4 GiB 的数据,用时 6.1 分钟
我不收集或传输数据到驱动程序,分区数据是否会导致此问题?如果是这种情况:
代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._
val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)
val jsonDF = spark.read.json("/mnt/myfile")
val res = jsonDF
.withColumn("row", row_number.over(w))
.where($"row" === 1)
.drop("row")
res.write.json("/mnt/myfile/spark_output")
Run Code Online (Sandbox Code Playgroud)
然后我只尝试再次加载和写入数据而不进行转换,并遇到同样的问题,代码:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql databricks azure-databricks
我想对我的 DF 进行一些检查,为了尝试它,我使用以下代码:
start = '2020-12-10'
end = datetime.date.today()
country='gb'
df_ua = (spark.table(f'nn_squad7_{country}.fact_table')
.filter(f.col('date_key').between(start,end))
#.filter(f.col('is_client')==1)
.filter(f.col('source')=='tickets')
.filter(f.col('subtype')=='trx')
.filter(f.col('is_trx_ok') == 1)
.select('ticket_id').distinct()
)
output = df_ua.count('ticket_id').distinct()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
类型错误:count() 采用 1 个位置参数,但给出了 2 个
我不明白为什么我会得到它,有什么线索吗?
我正在尝试使用 AWS EMR 中的 Pyspark 读取驻留在 s3 的 Excel 文件,为了执行此操作,我下载了 Spark-Excel jarspark-excel_2.11-0.12.4.jar 和 Spark-excel_2.12-0.13.5 .jar 并放入 s3 存储桶中
scenario 1:
===========
df = spark.read.format("com.crealytics.spark.excel").option("useHeader", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")
spark-submit --jars s3://Bucket/spark-excel_2.11-0.12.4.jar test.py
Error:
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/collections4/IteratorUtils
scenario2:
=========
df = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")
spark-submit --jars s3://Bucket/spark-excel_2.12-0.13.5.jar test.py
Error:
py4j.protocol.Py4JJavaError: An error occurred while calling o79.load.
: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?我感谢您的帮助 !
apache-spark-sql ×10
apache-spark ×7
pyspark ×3
python ×3
dataframe ×2
scala ×2
sql ×2
databricks ×1
hive ×1
pandas ×1
presto ×1
trino ×1