我正在使用pyspark数据帧分析一些数据,假设我有一个df我正在聚合的数据帧:
df.groupBy("group")\
.agg({"money":"sum"})\
.show(100)
Run Code Online (Sandbox Code Playgroud)
这会给我:
group SUM(money#2L)
A 137461285853
B 172185566943
C 271179590646
Run Code Online (Sandbox Code Playgroud)
聚合工作正常,但我不喜欢新的列名"SUM(钱#2L)".有没有一种巧妙的方法可以将此列重命名为人类可读的.agg方法?也许更类似于人们会做的事情dplyr:
df %>% group_by(group) %>% summarise(sum_money = sum(money))
Run Code Online (Sandbox Code Playgroud) 如何RDD使用分布式方法,IPython和Spark 找到整数的中位数?的RDD是约700 000元,因此过大,以收集和发现中位数.
这个问题与这个问题类似.但是,问题的答案是使用Scala,我不知道.
使用Scala答案的思考,我试图在Python中编写类似的答案.
我知道我首先要排序RDD.我不知道怎么.我看到sortBy(按给定的方式对此RDD进行排序keyfunc)和sortByKey(对此进行排序RDD,假设它由(键,值)对组成.)方法.我认为两者都使用键值,而我RDD只有整数元素.
myrdd.sortBy(lambda x: x)?rdd.count())的长度.编辑:
我有个主意.也许我可以索引我的RDD然后key = index和value = element.然后我可以尝试按价值排序?我不知道这是否可行,因为只有一种sortByKey方法.
我正试图找到spark.worker.dir当前的道路sparkcontext.
如果我明确地将其设置为a config param,我可以将其读回来SparkConf,但无论如何都要config使用PySpark?来访问完整的(包括所有默认值)?
请考虑以下代码段(假设spark已设置为某些代码段SparkSession):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
Run Code Online (Sandbox Code Playgroud)
请注意,temperature字段是浮动列表.我想将这些浮点数列表转换为MLlib类型Vector,我希望使用基本DataFrameAPI 表示这种转换,而不是通过RDD表达(这是低效的,因为它将所有数据从JVM发送到Python,处理在Python中完成,我们没有得到Spark的Catalyst优化器,yada yada的好处.我该怎么做呢?特别:
这就是我期望的"正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为一个上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
Run Code Online (Sandbox Code Playgroud)
现在例如df_with_strings.collect()[0]["temperatures"][1]是'-7.0'.但是如果我施放到ml Vector那么事情就不那么顺利了:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
Run Code Online (Sandbox Code Playgroud)
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type …Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql pyspark apache-spark-ml
假设我有一个相当大的数据集,形式如下:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
Run Code Online (Sandbox Code Playgroud)
我想要做的是仅根据第一,第三和第四列的值删除重复的行.
删除完全重复的行很简单:
data = data.distinct()
Run Code Online (Sandbox Code Playgroud)
第5行或第6行将被删除
但是,我如何仅删除基于第1,3和4列的重复行?即删除以下任何一个:
('Baz',22,'US',6)
('Baz',36,'US',6)
Run Code Online (Sandbox Code Playgroud)
在Python中,这可以通过使用指定列来完成.drop_duplicates().我怎样才能在Spark/Pyspark中实现同样的目标?
我试图找出PySpark中DataFrame的大小/形状.我没有看到一个可以做到这一点的功能.
在Python中,我可以做到
data.shape()
Run Code Online (Sandbox Code Playgroud)
PySpark中是否有类似的功能.这是我目前的解决方案,但我正在寻找一个元素
row_number = data.count()
column_number = len(data.dtypes)
Run Code Online (Sandbox Code Playgroud)
列数的计算并不理想......
我在 AWS EMR 上使用 pyspark(4 个 r5.xlarge 作为 4 个工作线程,每个工作线程有 1 个执行程序和 4 个核心),并且我得到了AttributeError: Can't get attribute 'new_block' on <module 'pandas.core.internals.blocks'. 下面是引发此错误的代码片段:
search = SearchEngine(db_file_dir = "/tmp/db")
conn = sqlite3.connect("/tmp/db/simple_db.sqlite")
pdf_ = pd.read_sql_query('''select zipcode, lat, lng,
bounds_west, bounds_east, bounds_north, bounds_south from
simple_zipcode''',conn)
brd_pdf = spark.sparkContext.broadcast(pdf_)
conn.close()
@udf('string')
def get_zip_b(lat, lng):
pdf = brd_pdf.value
out = pdf[(np.array(pdf["bounds_north"]) >= lat) &
(np.array(pdf["bounds_south"]) <= lat) &
(np.array(pdf['bounds_west']) <= lng) &
(np.array(pdf['bounds_east']) >= lng) ]
if len(out):
min_index = np.argmin( (np.array(out["lat"]) - …Run Code Online (Sandbox Code Playgroud) 我正在试图找出在Spark数据帧列中获得最大值的最佳方法.
请考虑以下示例:
df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
Run Code Online (Sandbox Code Playgroud)
这创造了:
+---+---+
| A| B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+
Run Code Online (Sandbox Code Playgroud)
我的目标是找到A列中的最大值(通过检查,这是3.0).使用PySpark,我可以想到以下四种方法:
# Method 1: Use describe()
float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
# Method 2: Use SQL
df.registerTempTable("df_table")
spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
# Method 3: Use groupby()
df.groupby().max('A').first().asDict()['max(A)']
# Method 4: Convert to RDD
df.select("A").rdd.max()[0]
Run Code Online (Sandbox Code Playgroud)
上面的每一个都给出了正确的答案,但在没有Spark分析工具的情况下,我无法分辨哪个是最好的.
任何关于上述哪种方法在Spark运行时或资源使用方面最有效的直觉或经验主义的想法,或者是否有比上述方法更直接的方法?
假设我有一个火花数据帧df1,有几列(其中列'id')和数据帧df2有两列,'id'和'other'.
有没有办法复制以下命令
sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")
Run Code Online (Sandbox Code Playgroud)
通过仅使用诸如join(),select()之类的pyspark函数?
我必须在函数中实现此连接,并且我不希望强制将sqlContext作为函数参数.
谢谢!