标签: pyspark

pyspark DataFrame selectExpr 不适用于多于一列

我们正在尝试 Spark DataFrameselectExpr及其对一列的工作,当我添加多于一列时,它会抛出错误。

第一个工作正常,第二个抛出错误。

代码示例:

 df1.selectExpr("coalesce(gtr_pd_am,0 )").show(2)
 df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Run Code Online (Sandbox Code Playgroud)

错误日志:

>>> df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/dataframe.py", line 1216, in selectExpr
    jdf = self._jdf.selectExpr(self._jseq(expr))
  File "/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/utils.py", line 73, in deco
    raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input ',' expecting <EOF>(line 1, pos 21)\n\n== SQL ==\ncoalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)\n---------------------^^^\n" 
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql pyspark

0
推荐指数
1
解决办法
7565
查看次数

pyspark:计算每个不同值的出现次数

我认为这个问题与:Spark DataFrame:计算每列的不同值

所以基本上我有一个 Spark 数据框,列A的值为1,1,2,2,1

1所以我想计算每个不同值(在本例中为 和2)在 列 中出现的次数A,并打印类似的内容

distinct_values | number_of_apperance
1 | 3
2 | 2
Run Code Online (Sandbox Code Playgroud)

pyspark

0
推荐指数
1
解决办法
2万
查看次数

基于列合并两个 Spark 数据框

我有 2 个数据框,需要根据列(员工代码)合并它们。请注意,数据框大约有 75 列,因此我提供了一个示例数据集来获取一些建议/示例解决方案。我正在使用 databricks,数据集是从 S3 读取的。

以下是我的 2 个数据框:

DATAFRAME - 1

|-----------------------------------------------------------------------------------|
|EMP_CODE   |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001     |   B   |       |       |       |       |       |       |       |       |
|-----------------------------------------------------------------------------------|


DATAFRAME - 2
|-----------------------------------------------------------------------------------|
|EMP_CODE   |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001     |       |       |       |       |   C   |       |       |       |       |   
|B10001     |       |       |       |       |       |       |       |       |T2     |
|A10001     |       |       |       |       |       |       |       |   B   |       |
|A10001     |       |       |   C   |       | …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark databricks

0
推荐指数
1
解决办法
4万
查看次数

如何更改pyspark中多列的类型?

我正在研究pyspark。我想像这样更改列类型:

df1=df.select(df.Date.cast('double'),df.Time.cast('double'),
          df.NetValue.cast('double'),df.Units.cast('double'))
Run Code Online (Sandbox Code Playgroud)

您可以看到 df 是一个数据框,我选择了 4 列并将它们全部更改为 double。由于使用 select,所有其他列都将被忽略。

但是,如果 df 有数百列,我只需要更改这 4 列。我需要保留所有列。那么,该怎么做呢?

python select types casting pyspark

0
推荐指数
1
解决办法
1万
查看次数

Spark 重试尝试配置在 Spark 会话中不起作用

我正在尝试限制火花应用程序尝试。作业失败一次后,会以yarn client模式重新提交。

我正在使用 Azure 数据工厂中的 HDInsight 活动。如果参数是从 ADF 传递的,则仅限一次尝试。

#

 val conf: SparkConf = new SparkConf()
  conf.set("spark.yarn.maxAppAttempts","5")
  conf.set("yarn.resourcemanager.am.max-attempts","5")

  val sc = SparkSession.builder
     .master("yarn")
    .config(conf)
    .appName("test")
    .enableHiveSupport()
    //.config("yarn.resourcemanager.am.max-attempts","1")
    //.config("spark.yarn.maxAppAttempts","1")
    .getOrCreate() ##

sc.conf.set("spark.yarn.maxAppAttempts","1")
Run Code Online (Sandbox Code Playgroud)

从控制台打印参数显示 (spark.yarn.maxAppAttempts,1) (yarn.resourcemanager.am.max-attempts,1)

apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
2884
查看次数

如何按名称从 Glue DynamicFrame 检索字段值

在 Spark DataFrame 中,您可以使用其名称来寻址架构中的列值,例如df['personId']- 但这种方式不适用于 Glue 的 DynamicFrame。是否有类似的方法,无需将 DynamicFrame 转换为 DataFrame,即可按名称直接访问列值?

python amazon-web-services pyspark aws-glue

0
推荐指数
1
解决办法
7600
查看次数

如何在pyspark中动态聚合列

pct_<original_name>_valid我想计算每个输入列的非缺失值的百分比。在此示例中只有 2 列,因此可以轻松手动编写下面的代码。但是当有 30 多个列时,我不想手动执行此操作。甚至可以动态地执行此操作吗?(例如,将列名称列表作为输入)

import pyspark.sql.functions as F

d = [{'name': 'Alice', 'age': 1}, {'name': 'Bae', 'age': None}]
df = spark.createDataFrame(d)

df.withColumn('name_valid', F.when(col("name").isNotNull(),1).otherwise(0))\
.withColumn('age_valid', F.when(col("age").isNotNull(),1).otherwise(0))\
.agg(
    (100.0*F.sum(col("name_valid"))/F.count(F.lit(1))).alias("pct_name_valid"),
    (100.0*F.sum(col("age_valid"))/F.count(F.lit(1))).alias("pct_age_valid")
)\
.show()
Run Code Online (Sandbox Code Playgroud)

结果如下:

+--------------+-------------+
|pct_name_valid|pct_age_valid|
+--------------+-------------+
|         100.0|         50.0|
+--------------+-------------+
Run Code Online (Sandbox Code Playgroud)

如前所述,我不想对所有 30 多个列手动执行此操作。有什么办法我可以这样做:

my_output = calculate_non_missing_percentage(df, my_columns = ["name", "age", "gender", "school", "color"])
Run Code Online (Sandbox Code Playgroud)

aggregate-functions pyspark

0
推荐指数
1
解决办法
3359
查看次数

哪个开源框架最适合 ETL Apache Airflow 或 Apache Beam?

我正在尝试使用开源框架进行 ETL,我听说过两种东西 Apache Beam 和 Apache Airflow,其中一种最适合整个 ETL 或 ELT,例如 Talend、Azure 数据工厂等,事实上,我尝试使用云数据仓库(redshift、azure 数据仓库、雪花等)完成所有工作,哪一个适合此类工作,如果我对这两个框架进行一些比较,那就太好了。提前致谢。

etl apache-spark pyspark airflow apache-beam

0
推荐指数
1
解决办法
1698
查看次数

“SparkSession”对象没有属性“databricks”

databricks 和 Spark 新手,我尝试运行以下命令并遇到此错误

spark.databricks.delta.retentionDurationCheck.enabled= "false"
Run Code Online (Sandbox Code Playgroud)

错误:'SparkSession' object has no attribute 'databricks'

apache-spark pyspark databricks delta-lake

0
推荐指数
1
解决办法
2997
查看次数

Pyspark 将所有数据帧值增加 1

我试图将数据框中的所有值增加 1,除了 ID 列之外。

例子:

在此输入图像描述

结果:

在此输入图像描述

这是我到目前为止所拥有的,但是当我有很多列要做时(例如 50),它会变得有点长。

df_add = df.select(
  'Id',
  (df['col_a'] + 1).alias('col_a'),
  ..
  ..
)
Run Code Online (Sandbox Code Playgroud)

有没有更Pythonic的方法来达到相同的结果?

python python-3.x apache-spark-sql pyspark

0
推荐指数
1
解决办法
4309
查看次数