小编pau*_*ult的帖子

获取排序组合

我有一个输入

A = [2,0,1,3,2,2,0,1,1,2,0].
Run Code Online (Sandbox Code Playgroud)

下面我删除所有重复项

A = list(Set(A))
Run Code Online (Sandbox Code Playgroud)

A现在[0,1,2,3].现在我希望我可以使用此列表进行所有对组合,但是它们不需要是唯一的...因此[0,3]等于[3,0][2,3]等于[3,2].在这个例子中它应该返回

[[0,1],[0,2],[0,3],[1,2],[1,3],[2,3]]
Run Code Online (Sandbox Code Playgroud)

我该如何实现这一目标?我查看了iteratoolslib.但无法提出解决方案.

python combinations

5
推荐指数
1
解决办法
2302
查看次数

如何在PySpark Dataframe show中设置显示精度

调用时如何在PySpark中设置显示精度.show()

考虑以下示例:

from math import sqrt
import pyspark.sql.functions as f

data = zip(
    map(lambda x: sqrt(x), range(100, 105)),
    map(lambda x: sqrt(x), range(200, 205))
)
df = sqlCtx.createDataFrame(data, ["col1", "col2"])
df.select([f.avg(c).alias(c) for c in df.columns]).show()
Run Code Online (Sandbox Code Playgroud)

哪个输出:

#+------------------+------------------+
#|              col1|              col2|
#+------------------+------------------+
#|10.099262230352151|14.212583322380274|
#+------------------+------------------+
Run Code Online (Sandbox Code Playgroud)

如何更改它,使其仅在小数点后显示3位数字?

所需的输出:

#+------+------+
#|  col1|  col2|
#+------+------+
#|10.099|14.213|
#+------+------+
Run Code Online (Sandbox Code Playgroud)

这是此scala问题的PySpark版本。我将其发布在这里是因为在搜索PySpark解决方案时找不到答案,并且我认为它将来可能会对其他人有所帮助。

pyspark spark-dataframe

5
推荐指数
1
解决办法
4208
查看次数

pyspark中两种TimestampType的区别

使用 pyspark,我有一个包含两TimestampType列的数据框:

df.schema
...StructField(session_start,TimestampType,true),StructField(session_end,TimestampType,true)...
Run Code Online (Sandbox Code Playgroud)

但我不知道如何计算差异:

df2 = df.withColumn("session_length",col("session_end")-col("session_start"))
Run Code Online (Sandbox Code Playgroud)

给我

AnalysisException: u"cannot resolve '(`session_end` - `session_start`)' due to data type mismatch: '(`session_end` - `session_start`)' requires (numeric or calendarinterval) type, not timestamp ...
Run Code Online (Sandbox Code Playgroud)

我还没有找到可行的替代方案。(有一个datediff函数,但它以天为单位返回结果,我需要以秒为单位的差异。)

我应该怎么写这个?

编辑:这个问题的原始版本有一个不同的错误,col因为在我的笔记本中作为变量重用。在重做import以恢复功能后,我现在得到了上面的AnalysisException.

apache-spark pyspark

5
推荐指数
1
解决办法
2942
查看次数

使用 UDF 时忽略条件

假设您有以下 pyspark DataFrame:

data= [('foo',), ('123',), (None,), ('bar',)]
df = sqlCtx.createDataFrame(data, ["col"])
df.show()
#+----+
#| col|
#+----+
#| foo|
#| 123|
#|null|
#| bar|
#+----+
Run Code Online (Sandbox Code Playgroud)

接下来的两个代码块应该做同样的事情——也就是说,如果不是,则返回列的大写null。但是,第二种方法(使用 a udf)会产生错误。

方法一:使用pyspark.sql.functions.upper()

import pyspark.sql.functions as f
df.withColumn(
    'upper',
    f.when(
        f.isnull(f.col('col')),
        f.col('col')
    ).otherwise(f.upper(f.col('col')))
).show()
#+----+-----+
#| col|upper|
#+----+-----+
#| foo|  FOO|
#| 123|  123|
#|null| null|
#| bar|  BAR|
#+----+-----+
Run Code Online (Sandbox Code Playgroud)

方法 2str.upper()在 a 内部使用udf

df.withColumn(
    'upper',
    f.when(
        f.isnull(f.col('col')),
        f.col('col')
    ).otherwise(f.udf(lambda x: x.upper(), StringType())(f.col('col')))
).show()
Run Code Online (Sandbox Code Playgroud)

这给了我 …

python user-defined-functions apache-spark pyspark spark-dataframe

5
推荐指数
1
解决办法
1613
查看次数

pyspark use dataframe inside udf

I have two dataframes df1

+---+---+----------+
|  n|val| distances|
+---+---+----------+
|  1|  1|0.27308652|
|  2|  1|0.24969208|
|  3|  1|0.21314497|
+---+---+----------+
Run Code Online (Sandbox Code Playgroud)

and df2

+---+---+----------+
| x1| x2|         w|
+---+---+----------+
|  1|  2|0.03103427|
|  1|  4|0.19012526|
|  1| 10|0.26805446|
|  1|  8|0.26825935|
+---+---+----------+
Run Code Online (Sandbox Code Playgroud)

I want to add a new column to df1 called gamma, which will contain the sum of the w value from df2 when df1.n == df2.x1 OR df1.n == df2.x2

我尝试使用 udf,但显然从不同的数据帧中选择是行不通的,因为值应该在计算之前确定

gamma_udf = udf(lambda n: float(df2.filter("x1 = …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
3936
查看次数

删除所有符合正则表达式条件的行

试图自学大熊猫..并与不同的dtypes玩耍

我有一个df如下

df = pd.DataFrame({'ID':[0,2,"bike","cake"], 'Course':['Test','Math','Store','History'] })
print(df)
    ID  Course
0   0   Test
1   2   Math
2   bike    Store
3   cake    History
Run Code Online (Sandbox Code Playgroud)

ID的dtype当然是一个对象。我想做的是,如果ID中包含字符串,则删除DF中的任何行。

我以为这很简单。

df.ID.filter(regex='[\w]*')
Run Code Online (Sandbox Code Playgroud)

但这返回了一切,是否有确定的处理此类问题的方法?

python regex pandas

5
推荐指数
3
解决办法
953
查看次数

在 pyspark 数据框中的第一个序号位置添加一个新列

我有一个 pyspark 数据框,如:

+--------+-------+-------+
| col1   | col2  | col3  |
+--------+-------+-------+
|  25    |  01   |     2 |
|  23    |  12   |     5 | 
|  11    |  22   |     8 |
+--------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

我想通过添加这样的新列来创建新的数据框:

+--------------+-------+-------+-------+
| new_column   | col1  | col2  | col3  |
+--------------+-------+-------+-------+
|  0           |  01   |     2 |  0    |
|  0           |  12   |     5 |  0    |
|  0           |  22   |     8 |  0    |
+--------------+-------+-------+-------+
Run Code Online (Sandbox Code Playgroud)

我知道我可以通过以下方式添加列:

df.withColumn("new_column", lit(0))
Run Code Online (Sandbox Code Playgroud)

但它最后像这样添加了列:

+--------------+-------+-------+-------------+
| …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

5
推荐指数
2
解决办法
6336
查看次数

PySpark DataFrame在使用爆炸之前将字符串的列更改为数组

我在spark DataFrame中有一列名为event_datajson格式,使用读取后from_json,得到以下模式:

root
 |-- user_id: string (nullable = true)
 |-- event_data: struct (nullable = true)
 |    |-- af_content_id: string (nullable = true)
 |    |-- af_currency: string (nullable = true)
 |    |-- af_order_id: long (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我只需要af_content_id此列。此属性可以具有不同的格式:

  • 一个字符串
  • 整数
  • Int和Str的列表。例如['ghhjj23','123546',12356]
  • 无(有时event_data不包含af_content_id

    我想使用explode函数以便为af_content_id格式为List的每个元素返回新行。但是当我应用它时,我得到一个错误:

    from pyspark.sql.functions import explode
    
    def get_content_id(column):
        return column.af_content_id
    
    df_transf_1 = df_transf_1.withColumn(
        "products_basket", 
        get_content_id(df_transf_1.event_data)
    )
    
    df_transf_1 = df_transf_1.withColumn(
        "product_id",
        explode(df_transf_1.products_basket)
    )
    
    Run Code Online (Sandbox Code Playgroud)

    products_basket …

  • apache-spark-sql pyspark

    5
    推荐指数
    1
    解决办法
    1432
    查看次数

    pyspark 将行转换为带有 null 的 json

    目标: 对于具有模式的数据框

    id:string
    Cold:string
    Medium:string
    Hot:string
    IsNull:string
    annual_sales_c:string
    average_check_c:string
    credit_rating_c:string
    cuisine_c:string
    dayparts_c:string
    location_name_c:string
    market_category_c:string
    market_segment_list_c:string
    menu_items_c:string
    msa_name_c:string
    name:string
    number_of_employees_c:string
    number_of_rooms_c:string
    Months In Role:integer
    Tenured Status:string
    IsCustomer:integer
    units_c:string
    years_in_business_c:string
    medium_interactions_c:string
    hot_interactions_c:string
    cold_interactions_c:string
    is_null_interactions_c:string
    
    Run Code Online (Sandbox Code Playgroud)

    我想添加一个新列,它是列的所有键和值的 JSON 字符串。我在这篇文章PySpark - Convert to JSON row by row和相关问题中使用了该方法。我的代码

    df = df.withColumn("JSON",func.to_json(func.struct([df[x] for x in small_df.columns])))
    
    Run Code Online (Sandbox Code Playgroud)

    我有一个问题:

    问题: 当任何行的列具有空值(并且我的数据有很多...)时,Json 字符串不包含键。即,如果 27 列中只有 9 列有值,那么 JSON 字符串只有 9 个键...我想要做的是维护所有键,但对于空值只需传递一个空字符串“”

    有小费吗?

    json apache-spark apache-spark-sql pyspark

    5
    推荐指数
    1
    解决办法
    5971
    查看次数

    在Spark 2.4上的pyspark.sql.functions.max().over(window)上使用.where()会抛出Java异常

    我在StackOverflow上发布了关于返回由另一列分组的列的最大值的帖子,并得到了一个意外的Java异常.

    这是测试数据:

    import pyspark.sql.functions as f
    data = [('a', 5), ('a', 8), ('a', 7), ('b', 1), ('b', 3)]
    df = spark.createDataFrame(data, ["A", "B"])
    df.show()
    
    +---+---+
    |  A|  B|
    +---+---+
    |  a|  5|
    |  a|  8|
    |  a|  7|
    |  b|  1|
    |  b|  3|
    +---+---+
    
    Run Code Online (Sandbox Code Playgroud)

    以下是据称适用于其他用户的解决方案:

    from pyspark.sql import Window
    w = Window.partitionBy('A')
    df.withColumn('maxB', f.max('B').over(w))\
        .where(f.col('B') == f.col('maxB'))\
        .drop('maxB').show()
    
    Run Code Online (Sandbox Code Playgroud)

    哪个应该产生这个输出:

    #+---+---+
    #|  A|  B|
    #+---+---+
    #|  a|  8|
    #|  b|  3|
    #+---+---+
    
    Run Code Online (Sandbox Code Playgroud)

    相反,我得到:

    java.lang.UnsupportedOperationException: Cannot evaluate expression: max(input[2, …
    Run Code Online (Sandbox Code Playgroud)

    exception apache-spark apache-spark-sql pyspark

    5
    推荐指数
    1
    解决办法
    266
    查看次数