我正在尝试根据某些数据手动创建一个 pyspark 数据框:
row_in=[(1566429545575348),(40.353977),(-111.701859)]
rdd=sc.parallelize(row_in)
schema = StructType([StructField("time_epocs", DecimalType(), True),StructField("lat", DecimalType(),True),StructField("long", DecimalType(),True)])
df_in_test=spark.createDataFrame(rdd,schema)
Run Code Online (Sandbox Code Playgroud)
当我尝试显示数据框时,这会出错,因此我不确定如何执行此操作。
但是,Spark 文档在这里对我来说似乎有点复杂,当我尝试按照这些说明进行操作时,我遇到了类似的错误。
有谁知道如何做到这一点?
我无法从此处提供的 Pyspark 文档中复制 Spark 代码。
例如,当我尝试使用以下代码时Grouped Map:
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
spark.stop()
spark = SparkSession.builder.appName("New_App_grouped_map").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误日志。
主要错误:
ERROR ArrowPythonRunner: Python worker exited unexpectedly (crashed)
Run Code Online (Sandbox Code Playgroud)
Caused by: java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.Direct …Run Code Online (Sandbox Code Playgroud) 我有两个数据框df1,df2并且想在称为的高基数字段上多次连接这些表visitor_id。我只想执行一次初始改组,并进行所有联接,而无需在Spark执行程序之间改组/交换数据。
为此,我创建了另一个列visitor_partition,该列为每个visitor_id始终分配一个介于之间的随机值[0, 1000)。我使用了一个自定义分区程序来确保对df1和df2进行精确分区,以使每个分区仅包含来自的一个值的行visitor_partition。最初的重新分区是我唯一想改组数据的时间。
我已将每个数据帧保存到s3中的镶木地板中,并按访问者分区进行分区-对于每个数据帧,这将创建以df1/visitor_partition=0,df1/visitor_partition=1...形式组织的1000个文件df1/visitor_partition=999。
现在,我从镶木地板中加载每个数据帧,并通过df1.createOrReplaceTempView('df1')(与df2相同)将它们注册为tempview ,然后运行以下查询
SELECT
...
FROM
df1 FULL JOIN df1 ON
df1.visitor_partition = df2.visitor_partition AND
df1.visitor_id = df2.visitor_id
Run Code Online (Sandbox Code Playgroud)
从理论上讲,查询执行计划者应该意识到这里不需要进行改组。例如,单个执行程序可以从中加载数据df1/visitor_partition=1并df2/visitor_partition=2在其中联接行。但是,在实践中,spark 2.4.4的查询计划程序会在此处执行完整的数据重排。
有什么办法可以防止这种洗牌的发生?
join apache-spark apache-spark-sql pyspark pyspark-dataframes
我是 Spark 世界的新手,我想在 Pyspark 中计算一个带有整数模的额外列。我没有在内置运算符中找到这个运算符。
有谁有想法吗?
我能够在 Azure Databricks 中使用 PySpark 执行简单的 SQL 语句,但我想改为执行存储过程。下面是我试过的 PySpark 代码。
#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd
#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
.option("dbtable", table) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#show …Run Code Online (Sandbox Code Playgroud) 我有一个类似于下面的数据框。我最初用 -1 填充所有空值以在 Pyspark 中进行连接。
df = pd.DataFrame({'Number': ['1', '2', '-1', '-1'],
'Letter': ['A', '-1', 'B', 'A'],
'Value': [30, 30, 30, -1]})
pyspark_df = spark.createDataFrame(df)
+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
| 1| A| 30|
| 2| -1| 30|
| -1| B| 30|
| -1| A| -1|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)
处理完数据集后,我需要将所有 -1 替换回空值。
+------+------+-----+
|Number|Letter|Value|
+------+------+-----+
| 1| A| 30|
| 2| null| 30|
| null| B| 30|
| null| A| null|
+------+------+-----+
Run Code Online (Sandbox Code Playgroud)
什么是最简单的方法来做到这一点?
我只是想知道EnumTypePySpark/Spark 中是否有一个。
我想在StringTypes (或其他类型)上添加约束以仅在 myDataFrame的架构中具有某些值。
我使用Apache spark作为ETL工具将表从Oracle提取到Elasticsearch中。
我遇到的问题是,数值列引发识别它们,decimal而Elasticsearch不接受decimal类型。所以我将每个decimal列转换double为Elasticsearch接受的列。
dataFrame = dataFrame.select(
[col(name) if 'decimal' not in colType else col(name).cast('double') for name, colType in dataFrame.dtypes]
)
Run Code Online (Sandbox Code Playgroud)
当前的问题是每个数字列将是一倍 ; 它是否具有十进制值。
我的问题是,有什么方法可以检测到列类型应该转换为整数类型还是双精度类型?
我正在尝试将数据框中的列值列表提取到列表中
+------+----------+------------+
|sno_id|updt_dt |process_flag|
+------+----------+------------+
| 123 |01-01-2020| Y |
+------+----------+------------+
| 234 |01-01-2020| Y |
+------+----------+------------+
| 512 |01-01-2020| Y |
+------+----------+------------+
| 111 |01-01-2020| Y |
+------+----------+------------+
Run Code Online (Sandbox Code Playgroud)
输出应该是 sno_id ['123','234','512','111'] 然后我需要迭代列表以对每个列表值运行一些逻辑。我目前正在使用 HiveWarehouseSession 通过使用 hive.executeQuery(query) 从 hive 表中获取数据到 Dataframe
感谢你的帮助。
我有一个如下所示的数据集:
我按年龄分组,平均每个年龄的朋友数量
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
def parseInput(line):
fields = line.split(',')
return Row(age = int(fields[2]), numFriends = int(fields[3]))
spark = SparkSession.builder.appName("FriendsByAge").getOrCreate()
lines = spark.sparkContext.textFile("data/fakefriends.csv")
friends = lines.map(parseInput)
friendDataset = spark.createDataFrame(friends)
counts = friendDataset.groupBy("age").count()
total = friendDataset.groupBy("age").sum('numFriends')
res = total.join(counts, "age").withColumn("Friend By Age", (F.col("sum(numFriends)") // F.col("count"))).drop('sum(numFriends)','count')
Run Code Online (Sandbox Code Playgroud)
我得到以下错误:
TypeError: unsupported operand type(s) for //: 'Column' and 'Column'
Run Code Online (Sandbox Code Playgroud)
通常,我在 Python 3.0+ 中使用//并像我在这里预期的那样返回一个整数值,但是,在 PySpark 数据报中, // 不起作用,只有 / 起作用。有什么理由不工作吗?我们必须使用round函数来获取整数值吗?