标签: apache-spark-sql

如何在pyspark中找到数据帧的大小

如何复制此代码以获取 pyspark 中的数据帧大小?

scala> val df = spark.range(10)
scala> print(spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats)
Statistics(sizeInBytes=80.0 B, hints=none)
Run Code Online (Sandbox Code Playgroud)

我想要做的是将 sizeInBytes 值放入一个变量中。

apache-spark-sql pyspark

1
推荐指数
1
解决办法
1738
查看次数

比较 PySpark 中的 3 列

我想比较 PySpark 中的 3 列(百分比总和为 100%)以创建一个新列,其中包含 3 列最大值的列名,或者,如果最大值不唯一,则包含列名具有相同的价值。我在这里看到了一些类似的例子,但是当最大值不唯一时,它们不会处理这种情况。下面是我的蛮力解决方案,但运行需要很长时间才能变得无用:

df\
  .withColumn("MaxName", 
      F.when( (col(A)>col(B)) & (col(A)>col(C)), "A")\
      .when( (col(B)>col(A)) & (col(B)>col(C)), "B")\
      .when( (col(C)>col(A)) & (col(C)>col(B)), "C")\
      .when( (col(A)==col(B)) &\
            (col(A)>col(C)) | (col(B)>col(C)), "AB")\
      .when( (col(C)==col(B)) | (col(C)==col(A)) &\
            (col(C)>col(B)) | (col(C)>col(A)), "CAB")\
      .otherwise("ABC")
Run Code Online (Sandbox Code Playgroud)

任何见解来构建更有效的解决方案?

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
190
查看次数

根据列中特定值的计数条件过滤掉 spark 数据帧的行 [pyspark 中的 spark.sql 语法]

我有以下火花数据框:

datalake_spark_dataframe_downsampled = pd.DataFrame( 
                           {'id' : ['001', '001', '001', '001', '001', '002', '002', '002'],
                            'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0, 1], 
                            'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
                            'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70, 70],
                            'DaysDeploymentDate': [10, 20, 21, 31, 41, 11, 19, 57],
                            'label': [0, 0, 1, 1, 1, 0, 0, 1]}
                           )
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )

# printSchema of the datalake_spark_dataframe_downsampled (spark df):

"root
 |-- IMEI: string (nullable = true)
 |-- OuterSensorConnected: integer (nullable …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1005
查看次数

写一个 spark 数据帧或写一个胶水动态帧,AWS Glue 中哪个选项更好?

在 AWS Glue 中,我从胶水动态框架中的数据目录中读取数据。然后将动态帧转换为火花数据帧以应用模式转换。为了将数据写回 s3,我看到开发人员将数据帧转换回动态帧。写一个胶水动态帧比写一个火花数据帧有什么优势吗?

amazon-web-services dataframe apache-spark apache-spark-sql aws-glue

1
推荐指数
1
解决办法
2129
查看次数

在 spark 中,在添加新行时,它们是否可以替代 union() 函数?

在我的代码中table_df有一些列,我正在对这些列进行一些计算,例如 min、max、mean 等,并且我想创建具有指定架构 new_df_schema 的 new_df。在我的逻辑中,我编写了用于计算的 spark-sql,并将每个新生成的行附加到最初为空的 new_df 中,最后,它会生成new_df所有列的所有计算值。

但问题是当列数更多时会导致性能问题。这可以在不使用 union() 函数或任何其他提高性能的方法的情况下完成吗?

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._

    val table_df = Seq(
      (10, 20, 30, 40, 50),
      (100, 200, 300, 400, 500),
      (111, 222, 333, 444, 555),
      (1123, 2123, 3123, 4123, 5123),
      (1321, 2321, 3321, 4321, 5321)
    ).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
    table_df.show(false)

    table_df.createOrReplaceTempView("table_df")

     val new_df_schema = StructType(
      StructField("Column_Name", StringType, false) ::
        StructField("number_of_values", LongType, false) ::
        StructField("number_of_distinct_values", LongType, false) ::
        StructField("distinct_count_with_nan", LongType, false) :: …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
675
查看次数

spark2.xx 是否支持 delta 湖

所以我试图使用 delta Lake 写 df_concat.write.format("delta").mode("overwrite").save("file") it gives me this error

java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider 和 delta Lake doc 说更新到 spark3 所以只想确认我们是否可以在 spark2.xx 上运行 deltalake

apache-spark apache-spark-sql pyspark delta-lake

1
推荐指数
1
解决办法
1482
查看次数

在pyspark中查找列表的最大值/最小值

我知道这是一个非常微不足道的问题,我很惊讶我在互联网上找不到答案,但是可以在 pyspark 中找到最大值或最小值 oa 列表吗?在 Python 中,它很容易通过

max(list)
Run Code Online (Sandbox Code Playgroud)

但是,当我在 pyspark 中尝试相同的操作时,出现以下错误:

An error was encountered:
An error occurred while calling z:org.apache.spark.sql.functions.max. Trace:
py4j.Py4JException: Method max([class java.util.ArrayList]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:276)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

关于我做错了什么的任何想法?

更新:添加我所做的事情: 这是我的清单:

cur_datelist

输出:

['2020-06-10', '2020-06-11', '2020-06-12', '2020-06-13', '2020-06-14', '2020-06-15', '2020-06-16', '2020-06-17', '2020-06-18', '2020-06-19', '2020-06-20', '2020-06-21', '2020-06-22', '2020-06-23', '2020-06-24', '2020-06-25', '2020-06-26', '2020-06-27', '2020-06-28', '2020-06-29', '2020-06-30', '2020-07-01', '2020-07-02', '2020-07-03', '2020-07-04', '2020-07-05', '2020-07-06', '2020-07-07', '2020-07-08', '2020-07-09', '2020-07-10', '2020-07-11', …
Run Code Online (Sandbox Code Playgroud)

amazon-web-services apache-spark-sql pyspark

1
推荐指数
1
解决办法
1184
查看次数

爆炸和爆炸外的区别

是什么区别爆炸explode_outer?这两个函数的文档是相同的,两个函数的示例也相同:

SELECT explode(array(10, 20));
 10
 20
Run Code Online (Sandbox Code Playgroud)

SELECT explode_outer(array(10, 20));
 10
 20
Run Code Online (Sandbox Code Playgroud)

火花源表明,有两个功能之间的差

expression[Explode]("explode"),
expressionGeneratorOuter[Explode]("explode_outer")
Run Code Online (Sandbox Code Playgroud)

但是与expression相比,expressionGeneratorOuter的效果是什么?

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2050
查看次数

Pyspark:提取数据帧的行,其中值包含一串字符

我正在使用 pyspark 并且我有一个只有一列值的大数据框,其中每一行都是一长串字符:

col1
-------
'2020-11-20;id09;150.09,-20.02'
'2020-11-20;id44;151.78,-25.14'
'2020-11-20;id78;148.24,-22.67'
'2020-11-20;id55;149.77,-27.89'
...
...
...
Run Code Online (Sandbox Code Playgroud)

我正在尝试提取数据帧的行,其中 'idxx' 匹配字符串列表,例如 ["id01", "id02", "id22", "id77", ...]。目前,我从数据框中提取行的方式是:

df.filter(df.col1.contains("id01") | df.col1.contains("id02") | df.col1.contains("id22") | ... )
Run Code Online (Sandbox Code Playgroud)

有没有办法使这更有效,而不必将每个字符串项硬编码到过滤器函数中?

sql apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
93
查看次数

当不满足所有选择标准时,Spark 会选择哪个连接?

我们知道在 Spark 中有三种类型的连接——广播连接?随机加入和排序合并加入?

  • 当小表加入大表时?使用广播加入?
  • 当小表比 BroadcastJoinThreshold 大时?使用 Shuffle Join?
  • 当大表加入?和加入键可以排序?使用排序合并加入?

两个大表的join,join key无法排序的情况怎么办?Spark 会选择哪种连接类型?

join apache-spark apache-spark-sql

1
推荐指数
1
解决办法
171
查看次数