这个问题的目标是记录:
在PySpark中使用JDBC连接读取和写入数据所需的步骤
JDBC源和已知解决方案可能存在的问题
通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.
根据这个
Catalyst应用逻辑优化,例如谓词下推.优化器可以将过滤器谓词下推到数据源中,使物理执行能够跳过不相关的数据.
Spark支持将谓词下推到数据源.此功能是否也适用于JDBC?
(通过检查数据库日志,我可以看到它现在不是默认行为 - 完整查询将传递给数据库,即使它后来受到限制因素限制)
更多细节
使用PostgreSQL 9.4运行Spark 1.5
代码段:
from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION
sc = SparkContext()
sqlContext = SQLContext(sc)
url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"
df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()
Run Code Online (Sandbox Code Playgroud)
SQL跟踪:
< 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = 3
< 2015-09-15 07:11:37.771 EDT >LOG: execute <unnamed>: SELECT * FROM dummy WHERE 1=0
< 2015-09-15 07:11:37.830 EDT >LOG: execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, …Run Code Online (Sandbox Code Playgroud) 我想访问火花数据帧的前100行,并将结果写回CSV文件.
为什么take(100)基本上是即时的,而
df.limit(100)
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.option("header", true)
.option("delimiter", ";")
.csv("myPath")
Run Code Online (Sandbox Code Playgroud)
需要永远.我不想获得每个分区的前100条记录,而只需要获得100条记录.
我com.datastax.spark:spark-cassandra-connector_2.11:2.4.0在运行 zeppelin notebooks 时使用,但不明白 spark 中两个操作之间的区别。第一个操作需要很多时间进行计算,第二个操作立即执行。有人可以向我解释两种操作之间的区别吗:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
case class SomeClass(val someField:String)
val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
.map(x => {SomeClass("test")})
.filter(x => x != null)
.toDF()
.limit(4)
//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4 …Run Code Online (Sandbox Code Playgroud) performance scala query-optimization apache-spark apache-spark-sql