Abh*_*ole 6 postgresql jdbc apache-spark pyspark spark-dataframe
我试图从PostgreSQL数据库加载大约1M行到Spark.使用Spark时需要大约10秒.但是,使用psycopg2驱动程序加载相同的查询需要2s.我正在使用postgresql jdbc驱动程序版本42.0.0
def _loadFromPostGres(name):
url_connect = "jdbc:postgresql:"+dbname
properties = {"user": "postgres", "password": "postgres"}
df = SparkSession.builder.getOrCreate().read.jdbc(url=url_connect, table=name, properties=properties)
return df
df = _loadFromPostGres("""
(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as
user_series_game""")
print measure(lambda : len(df.collect()))
Run Code Online (Sandbox Code Playgroud)
输出是 -
--- 10.7214591503 seconds ---
1076131
Run Code Online (Sandbox Code Playgroud)
使用psycopg2 -
import psycopg2
conn = psycopg2.connect(conn_string)
cur = conn.cursor()
def _exec():
cur.execute("""(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298)""")
return cur.fetchall()
print measure(lambda : len(_exec()))
cur.close()
conn.close()
Run Code Online (Sandbox Code Playgroud)
输出是 -
--- 2.27961301804 seconds ---
1076131
Run Code Online (Sandbox Code Playgroud)
测量功能 -
def measure(func) :
start_time = time.time()
x = func()
print("--- %s seconds ---" % (time.time() - start_time))
return x
Run Code Online (Sandbox Code Playgroud)
请帮我找到这个问题的原因.
编辑1
我做了一些基准测试.使用Scala和JDBC -
import java.sql._;
import scala.collection.mutable.ArrayBuffer;
def exec() {
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val conn = DriverManager.getConnection(url,"postgres","postgres");
val sqlText = """SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298"""
val t0 = System.nanoTime()
val stmt = conn.prepareStatement(sqlText, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs = stmt.executeQuery()
val list = new ArrayBuffer[(Long, Long, Long, Double)]()
while (rs.next()) {
val seriesId = rs.getLong("seriesId")
val companyId = rs.getLong("companyId")
val userId = rs.getLong("userId")
val score = rs.getDouble("score")
list.append((seriesId, companyId, userId, score))
}
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
println(list.size)
rs.close()
stmt.close()
conn.close()
}
exec()
Run Code Online (Sandbox Code Playgroud)
输出是 -
Elapsed time: 1.922102285s
1143402
Run Code Online (Sandbox Code Playgroud)
当我在Spark + Scala中收集了() -
import org.apache.spark.sql.SparkSession
def exec2() {
val spark = SparkSession.builder().getOrCreate()
val url = ("jdbc:postgresql://prod.caumccqvmegm.ap-southeast-1.rds.amazonaws.com/prod"+
"?tcpKeepAlive=true&prepareThreshold=-1&binaryTransfer=true&defaultRowFetchSize=10000")
val sqlText = """(SELECT "seriesId", "companyId", "userId", "score"
FROM user_series_game
WHERE "companyId"=655124304077004298) as user_series_game"""
val t0 = System.nanoTime()
val df = spark.read
.format("jdbc")
.option("url", url)
.option("dbtable", sqlText)
.option("user", "postgres")
.option("password", "postgres")
.load()
val list = df.collect()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) * 1e-9 + "s")
print (list.size)
}
exec2()
Run Code Online (Sandbox Code Playgroud)
输出是
Elapsed time: 1.486141076s
1143445
Run Code Online (Sandbox Code Playgroud)
因此,在Python序列化中花费了4倍的额外时间.我知道会有一些惩罚,但这似乎太多了.
原因很简单,有两个同时发生的原因。
\n\n首先,我将向您介绍如何psycopg2运作。
该库psycopg2与连接到 RDMS 的任何其他库一样工作。该库会将查询发送到您的 postgres 引擎,并将数据返回给您。像这样直接向前走。
\n\n\nConn -> 查询 -> ReturnData -> FetchData
\n
当您使用 Spark 时,有两种方式略有不同。Spark 不像一种在单线程中运行的编程语言。它有一个可以工作的分布式系统。即使您在本地计算机上运行。看到Spark有一个基本的概念:Driver(Master)和Workers。
\n\nDriver接收到对Postgres执行查询的请求,Driver不会为每个worker请求来自Postgres的信息。
\n\n如果您在这里看到文档,您将看到如下注释:
\n\n\n\n\n不要在大型集群上并行创建太多分区;否则 Spark 可能会使您的外部数据库系统崩溃。
\n
此注释意味着每个工作人员将负责为您的 postgres 请求数据。这是启动此过程的一小部分开销,但并不是很大。但这里有一个开销,将数据发送给每个工作人员。
\n\n第二点,你在这部分代码中收集:
\n\nprint measure(lambda : len(df.collect()))\nRun Code Online (Sandbox Code Playgroud)\n\n收集功能将发送一条命令,让所有工作人员将数据发送给您的驱动程序。为了存储在驱动程序的内存中,它就像一个Reduce,它在进程的中间创建Shuffle。Shuffle 是将数据发送给其他工作人员的过程的步骤。在收集的情况下,每个工作人员都会将其发送给您的司机。
\n\n所以你代码的 JDBC 中 Spark 的步骤是:
\n\n\n\n\n(Workers)Conn -> (Workers)Query -> (Workers)FetchData -> (Driver)\n 请求数据 -> (Workers) Shuffle -> (Driver) Collect
\n
Spark 还发生了很多其他事情,比如 QueryPlan、构建 DataFrame 和其他东西。
\n\n这就是简单的 Python 代码比 Spark 的响应速度更快的原因。
\n