我想使用 JDBC 从 Postgresql 读取数据并将其存储在 pyspark dataframe 中。当我想使用 df.show()、df.take() 等方法预览数据框中的数据时,它们返回一个错误,指出原因是:java.lang.ClassNotFoundException: org.postgresql.Driver。但是 df.printschema() 会完美地返回数据库表的信息。这是我的代码:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.master("spark://spark-master:7077")
.appName("read-postgres-jdbc")
.config("spark.driver.extraClassPath", "/opt/workspace/postgresql-42.2.18.jar")
.config("spark.executor.memory", "1g")
.getOrCreate()
)
sc = spark.sparkContext
df = (
spark.read.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://postgres/postgres")
.option("table", 'public."ASSET_DATA"')
.option("dbtable", _select_sql)
.option("user", "airflow")
.option("password", "airflow")
.load()
)
df.show(1)
Run Code Online (Sandbox Code Playgroud)
错误日志:
Py4JJavaError: An error occurred while calling o44.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 …Run Code Online (Sandbox Code Playgroud) 我有这个数据帧path_df:
path_df.show()
+---------------+-------------+----+
|FromComponentID|ToComponentID|Cost|
+---------------+-------------+----+
| 160| 163|27.0|
| 160| 183|27.0|
| 161| 162|22.0|
| 161| 170|31.0|
| 162| 161|22.0|
| 162| 167|24.0|
| 163| 160|27.0|
| 163| 164|27.0|
| 164| 163|27.0|
| 164| 165|35.0|
| 165| 164|35.0|
| 165| 166|33.0|
| 166| 165|33.0|
| 166| 167|31.0|
| 167| 162|24.0|
| 167| 166|31.0|
| 167| 168|27.0|
| 168| 167|27.0|
| 168| 169|23.0|
| 169| 168|23.0|
+---------------+-------------+----+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)
从此,我想制作一个词典,如下:
{FromComponentID:{ToComponentID:Cost}}
对于我目前的数据,它将是:
{160 : {163 : 27,
183 …Run Code Online (Sandbox Code Playgroud) 我正在尝试将 Spark 数据帧的所有列更改为双精度类型,但我想知道是否有比循环列和转换更好的方法。
我有一个数据框 df:
+------+----------+--------------------+
|SiteID| LastRecID| Col_to_split|
+------+----------+--------------------+
| 2|1056962584|[214, 207, 206, 205]|
| 2|1056967423| [213, 208]|
| 2|1056870114| [213, 202, 199]|
| 2|1056876861|[203, 213, 212, 1...|
Run Code Online (Sandbox Code Playgroud)
我想将列分成这样的行:
+----------+-------------+-------------+
| RecID| index| Value|
+----------+-------------+-------------+
|1056962584| 0| 214|
|1056962584| 1| 207|
|1056962584| 2| 206|
|1056962584| 3| 205|
|1056967423| 0| 213|
|1056967423| 1| 208|
|1056870114| 0| 213|
|1056870114| 1| 202|
|1056870114| 2| 199|
|1056876861| 0| 203|
|1056876861| 1| 213|
|1056876861| 2| 212|
|1056876861| 3| 1..|
|1056876861| etc...| etc...|
Run Code Online (Sandbox Code Playgroud)
值包含列表中的值。Index 包含列表中值的索引。
我怎样才能使用 PySpark 做到这一点?
我需要从 pyspark 数据框中的字符串列中删除正则表达式
df = spark.createDataFrame(
[
("Dog 10H03", "10H03"),
("Cat 09H24 eats rat", "09H24"),
("Mouse 09H45 runs away", "09H45"),
("Mouse 09H45 enters room", "09H45"),
],
["Animal", "Time"],
)
Run Code Online (Sandbox Code Playgroud)
时间戳(例如10H03)是必须删除的正则表达式。
+--------------------+------------------+-----+
| Animal| Animal_strip_time| Time|
+--------------------+------------------+-----+
| Dog 10H03| Dog |10H03|
| Cat 09H24 eats rat| Cat eats rat|09H24|
|Mouse 09H45 runs ...| Mouse runs away|09H45|
|Mouse 09H45 enter...|Mouse enters room|09H45|
+--------------------+------------------+-----+
Run Code Online (Sandbox Code Playgroud)
该列中的时间戳Time可能与该列中的时间戳不同Animal。因此,它不能用于匹配字符串。
正则表达式应遵循 XXHXX 模式,其中 X 是 0-9 之间的数字
已使用 databricks 中的 pyspark 在 Azure Blob 存储中创建了按日期分区的 Parquet 文件,但在一个日期文件夹中收到了如此多的文件,例如 500 个文件。我需要使用 PySpark 减少文件数量,例如一个日期文件夹中的 10 或 15 个文件。
df.write.format("parquet").mode("overwrite").partitionBy("Date").save(
"/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)
我尝试过coalesce:
df.write.format("parquet").mode("overwrite").partitionBy("Date").coalesce(15).save(
"/mnt/mydata.parquet"
)
Run Code Online (Sandbox Code Playgroud)
但会抛出错误:
AttributeError:“DataFrameWriter”对象没有属性“coalesce”
请帮忙。
pyspark ×6
apache-spark ×3
python ×3
databricks ×1
dataframe ×1
jdbc ×1
postgresql ×1
regex ×1