标签: apache-spark-sql

Spark sortMergeJoin 不会更改为 shuffleHashJoin

我试图通过禁用 BroadcastHashJoin 和 SortMergeJoin 来强制 Spark 使用 ShuffleHashJoin,但 Spark 始终使用 SortMergeJoin。

我使用的是spark版本2.4.3

object ShuffleHashJoin {

def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)

val spark = SparkSession.builder()
  .appName("ShuffleHashJoin")
  .master("local[*]")
  .getOrCreate()

/*
* Disable auto broadcasting of table and SortMergeJoin
*/
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)

import spark.implicits._
val dataset = Seq(
  (0, "playing"),
  (1, "with"),
  (2, "ShuffledHashJoinExec")).toDF("id", "token")

dataset.join(dataset, Seq("id"), "inner").foreach(_ => ())

// infinite loop to keep the program running to check Spark UI at 4040 port.
while (true) {}
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1237
查看次数

使用 Apache Spark SQL 对结果进行子分组

我有以下事件表,我想将它们分组为较小的时间段,如下所示。

该表必须分为较小的集合,其中集合的起始行和结束行由 geohash 确定,如果 geohash 相同,则集合保留包含行,直到发现下一个 geohash 不同。

key time_stamp  geohash
k1  1           abcdfg
k1  5           abcdfg
k1  7           abcdf1
k1  9           abcdfg
k1  10          abcdf2
k1  12          abcdf2
k1  21          abcdf2
Run Code Online (Sandbox Code Playgroud)

如何使用 Apache Spark SQL 语法生成以下输出

key geohash first_time  last_time   duration    num_events
k1  abcdfg  1           5           4           2
k1  abcdf1  7           7           0           1
k1  abcdfg  9           9           0           1
k1  abcdf2  10          21          11          3
Run Code Online (Sandbox Code Playgroud)

有人可以帮助我实现这一目标吗?

sql window-functions gaps-and-islands apache-spark apache-spark-sql

0
推荐指数
1
解决办法
296
查看次数

IndentationError:databricks 和 pyspark 中出现意外缩进

from pyspark.sql.functions import *


ghj=finalDF.withColumn("temp", explode(split(regexp_replace(to_json(struct(col("sum(P0)"), col("sum(P1)"), col("sum(P2)"), col("sum(P3)"), col("sum(P4)"), col("sum(P5)"))),"""[\{"\}]""",""), ",")))
           .withColumn("Priority", split(col("temp"),":")[0])
           .withColumn("Count", split(col("temp"),":")[1]).select(col("NAME"), col("SHORT_DESCRIPTION"), col("Priority"), col("Count")).show()
Run Code Online (Sandbox Code Playgroud)

python apache-spark-sql

0
推荐指数
2
解决办法
5472
查看次数

Spark Structure Streaming 中的临时视图

在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图

func(tabdf, epoch_id):
    tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)

请任何人帮助我解决这个问题。

spark-streaming apache-spark-sql pyspark spark-structured-streaming

0
推荐指数
1
解决办法
3355
查看次数

用于将字符串转换为列的 Scala 简写 $ 无法识别字符串变量

我想对用 Scala 编写的 Spark 应用程序使用 $ 简写。但我有大量的列,我想使用变量来引用它们。同时,我想对保存列名的字符串使用 $ 简写符号。

df.select($"col1") // works
val columnName = "col1"
df.select($columnName) // gives error
Run Code Online (Sandbox Code Playgroud)

如何将变量columnName 与$ 简写一起使用?

scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
914
查看次数

Spark SQL 中用逗号分隔的字符串爆炸列

我有以下数据集

+----+-----------+
|col1|       col2|
+----+-----------+
|   1|val1, val2 |
|   2|val3, val4 |
+----+-----------+
Run Code Online (Sandbox Code Playgroud)

将所有值视为字符串现在我想将其转换为下面的数据集

+----+-----------+
|col1|       col2|
+----+-----------+
|   1|val1       |
|   1|val2       |
|   2|val3       |
|   2|val4       |
+----+-----------+
Run Code Online (Sandbox Code Playgroud)

我怎样才能实现这个目标?

apache-spark apache-spark-sql

0
推荐指数
1
解决办法
6713
查看次数

与 Levenshtein 距离的模糊连接

我有一个包含用户名(约 1 000 行)的表,称为“潜在用户”,另一个表称为“实际用户”(约 1000 万行)。所有记录都完全由 [az] 字符组成,没有空格。此外,我知道实际用户表中没有潜在用户。

我希望能够根据 Levenshtein 距离,计算 possible_users 中的每一行,actual_users 中最接近的记录是什么。例如:

| potential_users|
|----------------|
| user1          |
| kajd           |
| bbbbb          |
Run Code Online (Sandbox Code Playgroud)

| actual_users |
|--------------|
| kaj          |
| bbbbbbb      |
| user         |
Run Code Online (Sandbox Code Playgroud)

将返回:

| potential_users | actual_users | levenshtein_distance |
|-----------------|--------------|----------------------|
| user1           | user         | 1                    |
| kajd            | kaj          | 1                    |
| bbbbb           | bbbbbbb      | 2                    |
Run Code Online (Sandbox Code Playgroud)

如果表很短,我可以创建一个交叉联接,计算潜在用户中的每条记录与实际用户中的编辑距离,然后返回具有最低值的记录。然而,在我的例子中,这将创建一个 1 000 x 10 000 000 行的中间表,这有点不切实际。

是否有更干净的方法通过创建交叉连接来执行此类操作?

sql hive presto apache-spark-sql trino

0
推荐指数
1
解决办法
2063
查看次数

Databricks SparkException超过spark.driver.maxResultSize

我正在 Azure Databricks DBR 7.3 LTS、spark 3.0.1、scala 2.12 在 Standard_E4as_v4(32.0 GB 内存、4 个内核、1 DBU)VM 的(20 到 35)个工作人员集群上运行以下代码,并且类型为 Standard_DS5_v2 驱动程序( 56.0 GB 内存、16 核、3 DBU)

目标是处理约 5.5 TB 的数据

我面临以下异常:“org.apache.spark.SparkException:由于阶段失败而中止作业:1165个任务的序列化结果的总大小(4.0 GiB)大于spark.driver.maxResultSize 4.0 GiB”处理1163后57071,正在处理 148.4 GiB 的数据,用时 6.1 分钟

我不收集或传输数据到驱动程序,分区数据是否会导致此问题?如果是这种情况:

  • 有没有更好的分区方式?
  • 如何解决这个问题?

代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import spark.implicits._

val w = Window.partitionBy("productId").orderBy(col("@ingestionTimestamp").cast(TimestampType).desc)

val jsonDF = spark.read.json("/mnt/myfile")

val res = jsonDF
      .withColumn("row", row_number.over(w))
      .where($"row" === 1)
      .drop("row")

res.write.json("/mnt/myfile/spark_output")
Run Code Online (Sandbox Code Playgroud)

然后我只尝试再次加载和写入数据而不进行转换,并遇到同样的问题,代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql databricks azure-databricks

0
推荐指数
1
解决办法
4292
查看次数

统计 Pyspark Dataframe 中的行数

我想对我的 DF 进行一些检查,为了尝试它,我使用以下代码:

start = '2020-12-10'
end = datetime.date.today()
country='gb'


df_ua = (spark.table(f'nn_squad7_{country}.fact_table')
      .filter(f.col('date_key').between(start,end))
      #.filter(f.col('is_client')==1)
      .filter(f.col('source')=='tickets')
      .filter(f.col('subtype')=='trx')
      .filter(f.col('is_trx_ok') == 1) 
      .select('ticket_id').distinct() 
      )

output = df_ua.count('ticket_id').distinct()

Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

类型错误:count() 采用 1 个位置参数,但给出了 2 个

我不明白为什么我会得到它,有什么线索吗?

python dataframe apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
2万
查看次数

如何使用pyspark读取Excel文件?

我正在尝试使用 AWS EMR 中的 Pyspark 读取驻留在 s3 的 Excel 文件,为了执行此操作,我下载了 Spark-Excel jarspark-excel_2.11-0.12.4.jar 和 Spark-excel_2.12-0.13.5 .jar 并放入 s3 存储桶中

scenario 1:
===========
df = spark.read.format("com.crealytics.spark.excel").option("useHeader", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")

spark-submit --jars s3://Bucket/spark-excel_2.11-0.12.4.jar test.py

Error:
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/collections4/IteratorUtils

scenario2:
=========
df = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")

spark-submit --jars s3://Bucket/spark-excel_2.12-0.13.5.jar test.py

Error:
py4j.protocol.Py4JJavaError: An error occurred while calling o79.load.
: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)

Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?我感谢您的帮助 !

python pandas apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1万
查看次数