如何复制此代码以获取 pyspark 中的数据帧大小?
scala> val df = spark.range(10)
scala> print(spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats)
Statistics(sizeInBytes=80.0 B, hints=none)
Run Code Online (Sandbox Code Playgroud)
我想要做的是将 sizeInBytes 值放入一个变量中。
我想比较 PySpark 中的 3 列(百分比总和为 100%)以创建一个新列,其中包含 3 列最大值的列名,或者,如果最大值不唯一,则包含列名具有相同的价值。我在这里看到了一些类似的例子,但是当最大值不唯一时,它们不会处理这种情况。下面是我的蛮力解决方案,但运行需要很长时间才能变得无用:
df\
.withColumn("MaxName",
F.when( (col(A)>col(B)) & (col(A)>col(C)), "A")\
.when( (col(B)>col(A)) & (col(B)>col(C)), "B")\
.when( (col(C)>col(A)) & (col(C)>col(B)), "C")\
.when( (col(A)==col(B)) &\
(col(A)>col(C)) | (col(B)>col(C)), "AB")\
.when( (col(C)==col(B)) | (col(C)==col(A)) &\
(col(C)>col(B)) | (col(C)>col(A)), "CAB")\
.otherwise("ABC")
Run Code Online (Sandbox Code Playgroud)
任何见解来构建更有效的解决方案?
我有以下火花数据框:
datalake_spark_dataframe_downsampled = pd.DataFrame(
{'id' : ['001', '001', '001', '001', '001', '002', '002', '002'],
'OuterSensorConnected':[0, 0, 0, 1, 0, 0, 0, 1],
'OuterHumidity':[31.784826, 32.784826, 33.784826, 43.784826, 23.784826, 54.784826, 31.784826, 31.784826],
'EnergyConsumption': [70, 70, 70, 70, 70, 70, 70, 70],
'DaysDeploymentDate': [10, 20, 21, 31, 41, 11, 19, 57],
'label': [0, 0, 1, 1, 1, 0, 0, 1]}
)
datalake_spark_dataframe_downsampled = spark.createDataFrame(datalake_spark_dataframe_downsampled )
# printSchema of the datalake_spark_dataframe_downsampled (spark df):
"root
|-- IMEI: string (nullable = true)
|-- OuterSensorConnected: integer (nullable …Run Code Online (Sandbox Code Playgroud) 在 AWS Glue 中,我从胶水动态框架中的数据目录中读取数据。然后将动态帧转换为火花数据帧以应用模式转换。为了将数据写回 s3,我看到开发人员将数据帧转换回动态帧。写一个胶水动态帧比写一个火花数据帧有什么优势吗?
amazon-web-services dataframe apache-spark apache-spark-sql aws-glue
在我的代码中table_df有一些列,我正在对这些列进行一些计算,例如 min、max、mean 等,并且我想创建具有指定架构 new_df_schema 的 new_df。在我的逻辑中,我编写了用于计算的 spark-sql,并将每个新生成的行附加到最初为空的 new_df 中,最后,它会生成new_df所有列的所有计算值。
但问题是当列数更多时会导致性能问题。这可以在不使用 union() 函数或任何其他提高性能的方法的情况下完成吗?
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._
val table_df = Seq(
(10, 20, 30, 40, 50),
(100, 200, 300, 400, 500),
(111, 222, 333, 444, 555),
(1123, 2123, 3123, 4123, 5123),
(1321, 2321, 3321, 4321, 5321)
).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
table_df.show(false)
table_df.createOrReplaceTempView("table_df")
val new_df_schema = StructType(
StructField("Column_Name", StringType, false) ::
StructField("number_of_values", LongType, false) ::
StructField("number_of_distinct_values", LongType, false) ::
StructField("distinct_count_with_nan", LongType, false) :: …Run Code Online (Sandbox Code Playgroud) 所以我试图使用 delta Lake 写 df_concat.write.format("delta").mode("overwrite").save("file") it gives me this error:
java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider 和 delta Lake doc 说更新到 spark3 所以只想确认我们是否可以在 spark2.xx 上运行 deltalake
我知道这是一个非常微不足道的问题,我很惊讶我在互联网上找不到答案,但是可以在 pyspark 中找到最大值或最小值 oa 列表吗?在 Python 中,它很容易通过
max(list)
Run Code Online (Sandbox Code Playgroud)
但是,当我在 pyspark 中尝试相同的操作时,出现以下错误:
An error was encountered:
An error occurred while calling z:org.apache.spark.sql.functions.max. Trace:
py4j.Py4JException: Method max([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:276)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
关于我做错了什么的任何想法?
更新:添加我所做的事情: 这是我的清单:
cur_datelist
输出:
['2020-06-10', '2020-06-11', '2020-06-12', '2020-06-13', '2020-06-14', '2020-06-15', '2020-06-16', '2020-06-17', '2020-06-18', '2020-06-19', '2020-06-20', '2020-06-21', '2020-06-22', '2020-06-23', '2020-06-24', '2020-06-25', '2020-06-26', '2020-06-27', '2020-06-28', '2020-06-29', '2020-06-30', '2020-07-01', '2020-07-02', '2020-07-03', '2020-07-04', '2020-07-05', '2020-07-06', '2020-07-07', '2020-07-08', '2020-07-09', '2020-07-10', '2020-07-11', …Run Code Online (Sandbox Code Playgroud) 是什么区别爆炸和explode_outer?这两个函数的文档是相同的,两个函数的示例也相同:
SELECT explode(array(10, 20));
10
20
Run Code Online (Sandbox Code Playgroud)
和
SELECT explode_outer(array(10, 20));
10
20
Run Code Online (Sandbox Code Playgroud)
的火花源表明,有两个功能之间的差
expression[Explode]("explode"),
expressionGeneratorOuter[Explode]("explode_outer")
Run Code Online (Sandbox Code Playgroud)
但是与expression相比,expressionGeneratorOuter的效果是什么?
我正在使用 pyspark 并且我有一个只有一列值的大数据框,其中每一行都是一长串字符:
col1
-------
'2020-11-20;id09;150.09,-20.02'
'2020-11-20;id44;151.78,-25.14'
'2020-11-20;id78;148.24,-22.67'
'2020-11-20;id55;149.77,-27.89'
...
...
...
Run Code Online (Sandbox Code Playgroud)
我正在尝试提取数据帧的行,其中 'idxx' 匹配字符串列表,例如 ["id01", "id02", "id22", "id77", ...]。目前,我从数据框中提取行的方式是:
df.filter(df.col1.contains("id01") | df.col1.contains("id02") | df.col1.contains("id22") | ... )
Run Code Online (Sandbox Code Playgroud)
有没有办法使这更有效,而不必将每个字符串项硬编码到过滤器函数中?
我们知道在 Spark 中有三种类型的连接——广播连接?随机加入和排序合并加入?
两个大表的join,join key无法排序的情况怎么办?Spark 会选择哪种连接类型?
apache-spark-sql ×10
apache-spark ×8
pyspark ×6
python ×2
aws-glue ×1
dataframe ×1
delta-lake ×1
join ×1
sql ×1