我们正在尝试 Spark DataFrameselectExpr及其对一列的工作,当我添加多于一列时,它会抛出错误。
第一个工作正常,第二个抛出错误。
代码示例:
df1.selectExpr("coalesce(gtr_pd_am,0 )").show(2)
df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Run Code Online (Sandbox Code Playgroud)
错误日志:
>>> df1.selectExpr("coalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)").show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/dataframe.py", line 1216, in selectExpr
jdf = self._jdf.selectExpr(self._jseq(expr))
File "/usr/hdp/2.6.5.0-292/spark2/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/usr/hdp/2.6.5.0-292/spark2/python/pyspark/sql/utils.py", line 73, in deco
raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input ',' expecting <EOF>(line 1, pos 21)\n\n== SQL ==\ncoalesce(gtr_pd_am,0),coalesce(prev_gtr_pd_am,0)\n---------------------^^^\n"
Run Code Online (Sandbox Code Playgroud) 我认为这个问题与:Spark DataFrame:计算每列的不同值
所以基本上我有一个 Spark 数据框,列A的值为1,1,2,2,1
1所以我想计算每个不同值(在本例中为 和2)在 列 中出现的次数A,并打印类似的内容
distinct_values | number_of_apperance
1 | 3
2 | 2
Run Code Online (Sandbox Code Playgroud) 我有 2 个数据框,需要根据列(员工代码)合并它们。请注意,数据框大约有 75 列,因此我提供了一个示例数据集来获取一些建议/示例解决方案。我正在使用 databricks,数据集是从 S3 读取的。
以下是我的 2 个数据框:
DATAFRAME - 1
|-----------------------------------------------------------------------------------|
|EMP_CODE |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001 | B | | | | | | | | |
|-----------------------------------------------------------------------------------|
DATAFRAME - 2
|-----------------------------------------------------------------------------------|
|EMP_CODE |COLUMN1|COLUMN2|COLUMN3|COLUMN4|COLUMN5|COLUMN6|COLUMN7|COLUMN8|COLUMN9|
|-----------------------------------------------------------------------------------|
|A10001 | | | | | C | | | | |
|B10001 | | | | | | | | |T2 |
|A10001 | | | | | | | | B | |
|A10001 | | | C | | …Run Code Online (Sandbox Code Playgroud) 我正在研究pyspark。我想像这样更改列类型:
df1=df.select(df.Date.cast('double'),df.Time.cast('double'),
df.NetValue.cast('double'),df.Units.cast('double'))
Run Code Online (Sandbox Code Playgroud)
您可以看到 df 是一个数据框,我选择了 4 列并将它们全部更改为 double。由于使用 select,所有其他列都将被忽略。
但是,如果 df 有数百列,我只需要更改这 4 列。我需要保留所有列。那么,该怎么做呢?
我正在尝试限制火花应用程序尝试。作业失败一次后,会以yarn client模式重新提交。
我正在使用 Azure 数据工厂中的 HDInsight 活动。如果参数是从 ADF 传递的,则仅限一次尝试。
val conf: SparkConf = new SparkConf()
conf.set("spark.yarn.maxAppAttempts","5")
conf.set("yarn.resourcemanager.am.max-attempts","5")
val sc = SparkSession.builder
.master("yarn")
.config(conf)
.appName("test")
.enableHiveSupport()
//.config("yarn.resourcemanager.am.max-attempts","1")
//.config("spark.yarn.maxAppAttempts","1")
.getOrCreate() ##
sc.conf.set("spark.yarn.maxAppAttempts","1")
Run Code Online (Sandbox Code Playgroud)
从控制台打印参数显示 (spark.yarn.maxAppAttempts,1) (yarn.resourcemanager.am.max-attempts,1)
在 Spark DataFrame 中,您可以使用其名称来寻址架构中的列值,例如df['personId']- 但这种方式不适用于 Glue 的 DynamicFrame。是否有类似的方法,无需将 DynamicFrame 转换为 DataFrame,即可按名称直接访问列值?
pct_<original_name>_valid我想计算每个输入列的非缺失值的百分比。在此示例中只有 2 列,因此可以轻松手动编写下面的代码。但是当有 30 多个列时,我不想手动执行此操作。甚至可以动态地执行此操作吗?(例如,将列名称列表作为输入)
import pyspark.sql.functions as F
d = [{'name': 'Alice', 'age': 1}, {'name': 'Bae', 'age': None}]
df = spark.createDataFrame(d)
df.withColumn('name_valid', F.when(col("name").isNotNull(),1).otherwise(0))\
.withColumn('age_valid', F.when(col("age").isNotNull(),1).otherwise(0))\
.agg(
(100.0*F.sum(col("name_valid"))/F.count(F.lit(1))).alias("pct_name_valid"),
(100.0*F.sum(col("age_valid"))/F.count(F.lit(1))).alias("pct_age_valid")
)\
.show()
Run Code Online (Sandbox Code Playgroud)
结果如下:
+--------------+-------------+
|pct_name_valid|pct_age_valid|
+--------------+-------------+
| 100.0| 50.0|
+--------------+-------------+
Run Code Online (Sandbox Code Playgroud)
如前所述,我不想对所有 30 多个列手动执行此操作。有什么办法我可以这样做:
my_output = calculate_non_missing_percentage(df, my_columns = ["name", "age", "gender", "school", "color"])
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用开源框架进行 ETL,我听说过两种东西 Apache Beam 和 Apache Airflow,其中一种最适合整个 ETL 或 ELT,例如 Talend、Azure 数据工厂等,事实上,我尝试使用云数据仓库(redshift、azure 数据仓库、雪花等)完成所有工作,哪一个适合此类工作,如果我对这两个框架进行一些比较,那就太好了。提前致谢。
databricks 和 Spark 新手,我尝试运行以下命令并遇到此错误
spark.databricks.delta.retentionDurationCheck.enabled= "false"
Run Code Online (Sandbox Code Playgroud)
错误:'SparkSession' object has no attribute 'databricks'
我试图将数据框中的所有值增加 1,除了 ID 列之外。
例子:
结果:
这是我到目前为止所拥有的,但是当我有很多列要做时(例如 50),它会变得有点长。
df_add = df.select(
'Id',
(df['col_a'] + 1).alias('col_a'),
..
..
)
Run Code Online (Sandbox Code Playgroud)
有没有更Pythonic的方法来达到相同的结果?
pyspark ×10
apache-spark ×4
python ×4
databricks ×2
airflow ×1
apache-beam ×1
aws-glue ×1
casting ×1
delta-lake ×1
etl ×1
python-3.x ×1
select ×1
types ×1