相关疑难解决方法(0)

如何使用JDBC源在(Py)Spark中写入和读取数据?

这个问题的目标是记录:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和已知解决方案可能存在的问题

通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.

python scala apache-spark apache-spark-sql pyspark

57
推荐指数
1
解决办法
6万
查看次数

spark谓词下推是否适用于JDBC?

根据这个

Catalyst应用逻辑优化,例如谓词下推.优化器可以将过滤器谓词下推到数据源中,使物理执行能够跳过不相关的数据.

Spark支持将谓词下推到数据源.此功能是否也适用于JDBC?

(通过检查数据库日志,我可以看到它现在不是默认行为 - 完整查询将传递给数据库,即使它后来受到限制因素限制)

更多细节

使用PostgreSQL 9.4运行Spark 1.5

代码段:

from pyspark import SQLContext, SparkContext, Row, SparkConf
from data_access.data_access_db import REMOTE_CONNECTION

sc = SparkContext()
sqlContext = SQLContext(sc)

url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION)
sql = "dummy"

df = sqlContext.read.jdbc(url=url, table=sql)
df = df.limit(1)
df.show()
Run Code Online (Sandbox Code Playgroud)

SQL跟踪:

< 2015-09-15 07:11:37.718 EDT >LOG:  execute <unnamed>: SET extra_float_digits = 3                                                                                                                      
< 2015-09-15 07:11:37.771 EDT >LOG:  execute <unnamed>: SELECT * FROM dummy WHERE 1=0                                                                                                                   
< 2015-09-15 07:11:37.830 EDT >LOG:  execute <unnamed>: SELECT c.oid, a.attnum, a.attname, c.relname, …
Run Code Online (Sandbox Code Playgroud)

python jdbc apache-spark apache-spark-sql pyspark

23
推荐指数
1
解决办法
8152
查看次数

火花访问前n行 - 采取vs限制

我想访问火花数据帧的前100行,并将结果写回CSV文件.

为什么take(100)基本上是即时的,而

df.limit(100)
      .repartition(1)
      .write
      .mode(SaveMode.Overwrite)
      .option("header", true)
      .option("delimiter", ";")
      .csv("myPath")
Run Code Online (Sandbox Code Playgroud)

需要永远.我不想获得每个分区的前100条记录,而只需要获得100条记录.

limit apache-spark apache-spark-sql spark-dataframe

13
推荐指数
2
解决办法
1万
查看次数

火花计数与拍摄和长度

com.datastax.spark:spark-cassandra-connector_2.11:2.4.0在运行 zeppelin notebooks 时使用,但不明白 spark 中两个操作之间的区别。第一个操作需要很多时间进行计算,第二个操作立即执行。有人可以向我解释两种操作之间的区别吗:

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._

case class SomeClass(val someField:String)

val timelineItems = spark.read.format("org.apache.spark.sql.cassandra").options(scala.collection.immutable.Map("spark.cassandra.connection.host" -> "127.0.0.1", "table" -> "timeline_items", "keyspace" -> "timeline" )).load()
//some simplified code:
val timelineRow = timelineItems
        .map(x => {SomeClass("test")})
        .filter(x => x != null)
        .toDF()
        .limit(4)

//first operation (takes a lot of time. It seems spark iterates through all items in Cassandra and doesn't use laziness with limit 4)
println(timelineRow.count()) //return: 4 …
Run Code Online (Sandbox Code Playgroud)

performance scala query-optimization apache-spark apache-spark-sql

2
推荐指数
1
解决办法
1793
查看次数