标签: pyspark-dataframes

手动创建一个 pyspark 数据框

我正在尝试根据某些数据手动创建一个 pyspark 数据框:

row_in=[(1566429545575348),(40.353977),(-111.701859)]
rdd=sc.parallelize(row_in)
schema = StructType([StructField("time_epocs", DecimalType(),    True),StructField("lat", DecimalType(),True),StructField("long", DecimalType(),True)])
df_in_test=spark.createDataFrame(rdd,schema)
Run Code Online (Sandbox Code Playgroud)

当我尝试显示数据框时,这会出错,因此我不确定如何执行此操作。

但是,Spark 文档在这里对我来说似乎有点复杂,当我尝试按照这些说明进行操作时,我遇到了类似的错误。

有谁知道如何做到这一点?

pyspark pyspark-dataframes

21
推荐指数
3
解决办法
7万
查看次数

错误:Pyspark pandas_udf 文档代码的“java.lang.UnsupportedOperationException”

我无法从此处提供的 Pyspark 文档中复制 Spark 代码

例如,当我尝试使用以下代码时Grouped Map

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession

spark.stop()

spark = SparkSession.builder.appName("New_App_grouped_map").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))


@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()
Run Code Online (Sandbox Code Playgroud)

我收到以下错误日志。

主要错误:

ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
Run Code Online (Sandbox Code Playgroud)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-dataframes

10
推荐指数
2
解决办法
2971
查看次数

Spark:加入两个相同分区的数据帧时,防止混洗/交换

我有两个数据框df1df2并且想在称为的高基数字段上多次连接这些表visitor_id。我只想执行一次初始改组,并进行所有联接,而无需在Spark执行程序之间改组/交换数据。

为此,我创建了另一个列visitor_partition,该列为每个visitor_id始终分配一个介于之间的随机值[0, 1000)。我使用了一个自定义分区程序来确保对df1df2进行精确分区,以使每个分区仅包含来自的一个值的行visitor_partition。最初的重新分区是我唯一想改组数据的时间。

我已将每个数据帧保存到s3中的镶木地板中,并按访问者分区进行分区-对于每个数据帧,这将创建以df1/visitor_partition=0df1/visitor_partition=1...形式组织的1000个文件df1/visitor_partition=999

现在,我从镶木地板中加载每个数据帧,并通过df1.createOrReplaceTempView('df1')(与df2相同)将它们注册为tempview ,然后运行以下查询

SELECT
   ...
FROM
  df1 FULL JOIN df1 ON
    df1.visitor_partition = df2.visitor_partition AND
    df1.visitor_id = df2.visitor_id
Run Code Online (Sandbox Code Playgroud)

从理论上讲,查询执行计划者应该意识到这里不需要进行改组。例如,单个执行程序可以从中加载数据df1/visitor_partition=1df2/visitor_partition=2在其中联接行。但是,在实践中,spark 2.4.4的查询计划程序会在此处执行完整的数据重排。

有什么办法可以防止这种洗牌的发生?

join apache-spark apache-spark-sql pyspark pyspark-dataframes

6
推荐指数
1
解决办法
83
查看次数

如何在 Pyspark 中计算模数?

我是 Spark 世界的新手,我想在 Pyspark 中计算一个带有整数模的额外列。我没有在内置运算符中找到这个运算符。

有谁有想法吗?

apache-spark apache-spark-sql pyspark pyspark-dataframes

6
推荐指数
1
解决办法
6635
查看次数

如何在 Azure Databricks PySpark 中执行存储过程?

我能够在 Azure Databricks 中使用 PySpark 执行简单的 SQL 语句,但我想改为执行存储过程。下面是我试过的 PySpark 代码。

#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
    .option("dbtable", table) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show …
Run Code Online (Sandbox Code Playgroud)

python pyspark-sql azure-databricks pyspark-dataframes

5
推荐指数
2
解决办法
7755
查看次数

Pyspark:在数据框中用 null 替换所有出现的值

我有一个类似于下面的数据框。我最初用 -1 填充所有空值以在 Pyspark 中进行连接。

df = pd.DataFrame({'Number': ['1', '2', '-1', '-1'],
                   'Letter': ['A', '-1', 'B', 'A'],
                   'Value': [30, 30, 30, -1]})


pyspark_df = spark.createDataFrame(df)

+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
|     1|     A|   30|
|     2|    -1|   30|
|    -1|     B|   30|
|    -1|     A|   -1|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)

处理完数据集后,我需要将所有 -1 替换回空值。

+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
|     1|     A|   30|
|     2|  null|   30|
|  null|     B|   30|
|  null|     A| null|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)

什么是最简单的方法来做到这一点?

apache-spark apache-spark-sql pyspark pyspark-dataframes

5
推荐指数
2
解决办法
335
查看次数

PySpark 中有 Enum 类型吗?

我只是想知道EnumTypePySpark/Spark 中是否有一个。
我想在StringTypes (或其他类型)上添加约束以仅在 myDataFrame的架构中具有某些值。

apache-spark apache-spark-sql pyspark pyspark-dataframes

5
推荐指数
0
解决办法
216
查看次数

如何检测小数点应转换为整数还是双精度?

我使用Apache spark作为ETL工具将表从Oracle提取到Elasticsearch中

我遇到的问题是,数值列引发识别它们,decimalElasticsearch不接受decimal类型。所以我将每个decimal列转换doubleElasticsearch接受的列。

dataFrame = dataFrame.select(
    [col(name) if 'decimal' not in colType else col(name).cast('double') for name, colType in dataFrame.dtypes]
)
Run Code Online (Sandbox Code Playgroud)

当前的问题是每个数字列将是一倍 ; 它是否具有十进制值。

我的问题是,有什么方法可以检测到列类型应该转换为整数类型还是双精度类型?

python elasticsearch pyspark pyspark-sql pyspark-dataframes

4
推荐指数
2
解决办法
80
查看次数

要列出的 Pyspark 数据框列

我正在尝试将数据框中的列值列表提取到列表中

+------+----------+------------+
|sno_id|updt_dt   |process_flag|
+------+----------+------------+
| 123  |01-01-2020|     Y      |
+------+----------+------------+
| 234  |01-01-2020|     Y      |
+------+----------+------------+
| 512  |01-01-2020|     Y      |
+------+----------+------------+
| 111  |01-01-2020|     Y      |
+------+----------+------------+
Run Code Online (Sandbox Code Playgroud)

输出应该是 sno_id ['123','234','512','111'] 然后我需要迭代列表以对每个列表值运行一些逻辑。我目前正在使用 HiveWarehouseSession 通过使用 hive.executeQuery(query) 从 hive 表中获取数据到 Dataframe

感谢你的帮助。

pyspark pyspark-dataframes

4
推荐指数
1
解决办法
1万
查看次数

PySpark DataFrame Floor Division 不支持的操作数类型

我有一个如下所示的数据集:

在此处输入图片说明

我按年龄分组,平均每个年龄的朋友数量

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F

def parseInput(line):
    fields = line.split(',')
    return Row(age = int(fields[2]), numFriends = int(fields[3]))

spark = SparkSession.builder.appName("FriendsByAge").getOrCreate()
lines = spark.sparkContext.textFile("data/fakefriends.csv")
friends = lines.map(parseInput)
friendDataset = spark.createDataFrame(friends)
counts = friendDataset.groupBy("age").count()
total = friendDataset.groupBy("age").sum('numFriends')
res = total.join(counts, "age").withColumn("Friend By Age", (F.col("sum(numFriends)") // F.col("count"))).drop('sum(numFriends)','count')
Run Code Online (Sandbox Code Playgroud)

我得到以下错误:

TypeError: unsupported operand type(s) for //: 'Column' and 'Column'
Run Code Online (Sandbox Code Playgroud)

通常,我在 Python 3.0+ 中使用//并像我在这里预期的那样返回一个整数值,但是,在 PySpark 数据报中, // 不起作用,只有 / 起作用。有什么理由不工作吗?我们必须使用round函数来获取整数值吗?

pyspark pyspark-dataframes

3
推荐指数
1
解决办法
2516
查看次数