标签: pyspark

重命名pyspark数据帧聚合的列

我正在使用pyspark数据帧分析一些数据,假设我有一个df我正在聚合的数据帧:

df.groupBy("group")\
  .agg({"money":"sum"})\
  .show(100)
Run Code Online (Sandbox Code Playgroud)

这会给我:

group                SUM(money#2L)
A                    137461285853
B                    172185566943
C                    271179590646
Run Code Online (Sandbox Code Playgroud)

聚合工作正常,但我不喜欢新的列名"SUM(钱#2L)".有没有一种巧妙的方法可以将此列重命名为人类可读的.agg方法?也许更类似于人们会做的事情dplyr:

df %>% group_by(group) %>% summarise(sum_money = sum(money))
Run Code Online (Sandbox Code Playgroud)

dataframe pyspark

56
推荐指数
5
解决办法
5万
查看次数

如何使用Spark查找中值和分位数

如何RDD使用分布式方法,IPython和Spark 找到整数的中位数?的RDD是约700 000元,因此过大,以收集和发现中位数.

这个问题与这个问题类似.但是,问题的答案是使用Scala,我不知道.

如何使用Apache Spark计算精确中位数?

使用Scala答案的思考,我试图在Python中编写类似的答案.

我知道我首先要排序RDD.我不知道怎么.我看到sortBy(按给定的方式对此RDD进行排序keyfunc)和sortByKey(对此进行排序RDD,假设它由(键,值)对组成.)方法.我认为两者都使用键值,而我RDD只有整数元素.

  1. 首先,我在考虑做什么myrdd.sortBy(lambda x: x)
  2. 接下来我将找到rdd(rdd.count())的长度.
  3. 最后,我想在rdd的中心找到元素或2个元素.我也需要这个方法的帮助.

编辑:

我有个主意.也许我可以索引我的RDD然后key = index和value = element.然后我可以尝试按价值排序?我不知道这是否可行,因为只有一种sortByKey方法.

python median apache-spark rdd pyspark

55
推荐指数
3
解决办法
6万
查看次数

如何制作良好的可重现的Apache Spark示例

我花了相当多的时间阅读标签的一些问题,而且我经常发现海报没有提供足够的信息来真正理解他们的问题.我经常评论要求他们发布MCVE,但有时让他们显示一些样本输入/输出数据就像拔牙一样.例如:请参阅有关此问题的评论.

也许问题的一部分是人们只是不知道如何轻松地为火花数据帧创建MCVE.我认为将这个pandas问题的spark-dataframe版本作为可以链接的指南是有用的.

那么如何创造一个好的,可重复的例子呢?

dataframe apache-spark apache-spark-sql pyspark pyspark-sql

55
推荐指数
4
解决办法
3952
查看次数

是否有可能在PySpark中获取当前的spark上下文设置?

我正试图找到spark.worker.dir当前的道路sparkcontext.

如果我明确地将其设置为a config param,我可以将其读回来SparkConf,但无论如何都要config使用PySpark?来访问完整的(包括所有默认值)?

config apache-spark pyspark

54
推荐指数
9
解决办法
7万
查看次数

如何将数组(即列表)列转换为Vector

问题的简短版本!

请考虑以下代码段(假设spark已设置为某些代码段SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)

请注意,temperature字段是浮动列表.我想将这些浮点数列表转换为MLlib类型Vector,我希望使用基本DataFrameAPI 表示这种转换,而不是通过RDD表达(这是低效的,因为它将所有数据从JVM发送到Python,处理在Python中完成,我们没有得到Spark的Catalyst优化器,yada yada的好处.我该怎么做呢?特别:

  1. 有没有办法让直接演员工作?请参阅下面的详细信息(以及尝试解决方法失败)?或者,是否有其他操作具有我之后的效果?
  2. 从我在下面建议的两种替代解决方案(UDF vs爆炸/重新组合列表中的项目)中哪种更有效?或者是否有其他几乎但不是非常正确的替代品比其中任何一种更好?

直接投射不起作用

这就是我期望的"正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为一个上下文,让我提醒您将其转换为另一种类型的正常方法:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Run Code Online (Sandbox Code Playgroud)

现在例如df_with_strings.collect()[0]["temperatures"][1]'-7.0'.但是如果我施放到ml Vector那么事情就不那么顺利了:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Run Code Online (Sandbox Code Playgroud)

这给出了一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark apache-spark-ml

54
推荐指数
3
解决办法
2万
查看次数

根据RDD/Spark DataFrame中的特定列从行中删除重复项

假设我有一个相当大的数据集,形式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])
Run Code Online (Sandbox Code Playgroud)

我想要做的是仅根据第一,第三和第四列的值删除重复的行.

删除完全重复的行很简单:

data = data.distinct()
Run Code Online (Sandbox Code Playgroud)

第5行或第6行将被删除

但是,我如何仅删除基于第1,3和4列的重复行?即删除以下任何一个:

('Baz',22,'US',6)
('Baz',36,'US',6)
Run Code Online (Sandbox Code Playgroud)

在Python中,这可以通过使用指定列来完成.drop_duplicates().我怎样才能在Spark/Pyspark中实现同样的目标?

apache-spark apache-spark-sql pyspark

53
推荐指数
6
解决办法
9万
查看次数

PySpark 2.0 DataFrame的大小或形状

我试图找出PySpark中DataFrame的大小/形状.我没有看到一个可以做到这一点的功能.

在Python中,我可以做到

data.shape()
Run Code Online (Sandbox Code Playgroud)

PySpark中是否有类似的功能.这是我目前的解决方案,但我正在寻找一个元素

row_number = data.count()
column_number = len(data.dtypes)
Run Code Online (Sandbox Code Playgroud)

列数的计算并不理想......

size shape dataframe pyspark

53
推荐指数
5
解决办法
8万
查看次数

AttributeError:无法在&lt;模块'pandas.core.internals.blocks'&gt;上获取属性'new_block'

我在 AWS EMR 上使用 pyspark(4 个 r5.xlarge 作为 4 个工作线程,每个工作线程有 1 个执行程序和 4 个核心),并且我得到了AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'. 下面是引发此错误的代码片段:

search =  SearchEngine(db_file_dir = "/tmp/db")
conn = sqlite3.connect("/tmp/db/simple_db.sqlite")
pdf_ = pd.read_sql_query('''select  zipcode, lat, lng, 
                        bounds_west, bounds_east, bounds_north, bounds_south from 
                        simple_zipcode''',conn)
brd_pdf = spark.sparkContext.broadcast(pdf_) 
conn.close()


@udf('string')
def get_zip_b(lat, lng):
    pdf = brd_pdf.value 
    out = pdf[(np.array(pdf["bounds_north"]) >= lat) & 
              (np.array(pdf["bounds_south"]) <= lat) & 
              (np.array(pdf['bounds_west']) <= lng) & 
              (np.array(pdf['bounds_east']) >= lng) ]
    if len(out):
        min_index = np.argmin( (np.array(out["lat"]) - …
Run Code Online (Sandbox Code Playgroud)

python attributeerror pandas apache-spark pyspark

53
推荐指数
4
解决办法
10万
查看次数

获取Spark数据帧列中最大值的最佳方法

我正在试图找出在Spark数据帧列中获得最大值的最佳方法.

请考虑以下示例:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Run Code Online (Sandbox Code Playgroud)

这创造了:

+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)

我的目标是找到A列中的最大值(通过检查,这是3.0).使用PySpark,我可以想到以下四种方法:

# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])

# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']

# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']

# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Run Code Online (Sandbox Code Playgroud)

上面的每一个都给出了正确的答案,但在没有Spark分析工具的情况下,我无法分辨哪个是最好的.

任何关于上述哪种方法在Spark运行时或资源使用方面最有效的直觉或经验主义的想法,或者是否有比上述方法更直接的方法?

python apache-spark apache-spark-sql pyspark

51
推荐指数
8
解决办法
9万
查看次数

加入两个数据框,从一个列中选择所有列,从另一个列中选择一些列

假设我有一个火花数据帧df1,有几列(其中列'id')和数据帧df2有两列,'id'和'other'.

有没有办法复制以下命令

sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")
Run Code Online (Sandbox Code Playgroud)

通过仅使用诸如join(),select()之类的pyspark函数?

我必须在函数中实现此连接,并且我不希望强制将sqlContext作为函数参数.

谢谢!

pyspark pyspark-sql

50
推荐指数
6
解决办法
12万
查看次数