所以我一直在使用sbt with assembly将我的所有依赖项打包成一个jar用于我的spark作业.我有几个工作,我c3p0用来设置连接池信息,广播出来,然后foreachPartition在RDD上使用然后获取连接,并将数据插入数据库.在我的sbt构建脚本中,我包含了
"mysql" % "mysql-connector-java" % "5.1.33"
Run Code Online (Sandbox Code Playgroud)
这可确保JDBC连接器与作业打包在一起.一切都很好.
所以最近我开始玩SparkSQL,并意识到简单地采用数据帧并将其保存到具有新功能的jdbc源更容易 1.3.0
我收到以下异常:
java.sql.SQLException:在java.sql中找不到合适的jdbc驱动程序:mysql://some.domain.com/myschema?user = user&password = password at java.sql.DriverManager.getConnection(DriverManager.java:596).的DriverManager.getConnection(DriverManager.java:233)
当我在本地运行时,我通过设置绕过它
SPARK_CLASSPATH=/path/where/mysql-connector-is.jar
Run Code Online (Sandbox Code Playgroud)
最终我想知道的是,为什么这个工作不应该找到驱动程序什么时候应该打包它呢?我的其他工作从未遇到过这个问题.从我可以告诉他们c3p0和数据帧代码都使用java.sql.DriverManager(它处理从我可以告诉你的一切导入所有)所以它应该工作得很好?如果有什么东西阻止汇编方法工作,我需要做些什么来使其工作?
我已经构建了Spark-csv,并且可以使用以下命令从pyspark shell中使用它
bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3
Run Code Online (Sandbox Code Playgroud)
得到错误
>>> df_cat.save("k.csv","com.databricks.spark.csv")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/abhishekchoudhary/bigdata/cdh5.2.0/spark-1.3.1/python/pyspark/sql/dataframe.py", line 209, in save
self._jdf.save(source, jmode, joptions)
File "/Users/abhishekchoudhary/bigdata/cdh5.2.0/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/Users/abhishekchoudhary/bigdata/cdh5.2.0/spark-1.3.1/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError
Run Code Online (Sandbox Code Playgroud)
我应该将jar文件放在我的spark预构建设置中,以便我也可以spark-csv直接从python编辑器访问.
以下示例代码尝试将一些案例对象放入数据框中.代码包括案例对象层次结构的定义和使用此特征的案例类:
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
sealed trait Some
case object AType extends Some
case object BType extends Some
case class Data( name : String, t: Some)
object Example {
def main(args: Array[String]) : Unit = {
val conf = new SparkConf()
.setAppName( "Example" )
.setMaster( "local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize( Seq( Data( "a", AType), Data( "b", BType) ), 4).toDF()
df.show()
}
}
Run Code Online (Sandbox Code Playgroud)
执行代码时,我遗憾地遇到以下异常:
java.lang.UnsupportedOperationException: Schema for type …Run Code Online (Sandbox Code Playgroud) 如在Web上的许多 其他位置所述,向现有DataFrame添加新列并不简单.不幸的是,拥有此功能非常重要(即使它在分布式环境中效率低下),尤其是在尝试连接两个DataFrames时unionAll.
将null列添加到a DataFrame以便于实现最优雅的解决方法是unionAll什么?
我的版本是这样的:
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))
Run Code Online (Sandbox Code Playgroud) 我正在寻找一种检查点DataFrames的方法.Checkpoint目前是RDD上的一个操作,但我找不到如何使用DataFrames.持久化和缓存(它们是彼此的同义词)可用于DataFrame但它们不会"破坏谱系",因此不适用于可循环数百(或数千)次迭代的方法.
例如,假设我有一个签名为DataFrame => DataFrame的函数列表.即使myfunctions有数百或数千个条目,我想有办法计算以下内容:
def foo(dataset: DataFrame, g: DataFrame => Unit) =
myfunctions.foldLeft(dataset) {
case (df, f) =>
val nextDF = f(df)
g(nextDF)
nextDF
}
Run Code Online (Sandbox Code Playgroud) 使用Spark 1.4.0,Scala 2.10
我一直试图找出一种方法来使用最后一次已知的观察来转发填充空值,但我没有看到一种简单的方法.我认为这是一件非常常见的事情,但找不到显示如何执行此操作的示例.
我看到函数向前转移填充NaN的值,或滞后/超前函数来填充或移位数据偏移量,但没有任何东西可以获取最后的已知值.
在线查看,我在R中看到很多关于同一件事的Q/A,但在Spark/Scala中没有.
我正在考虑在日期范围内进行映射,从结果中过滤出NaN并选择最后一个元素,但我想我对语法感到困惑.
使用DataFrames我尝试类似的东西
import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)
Run Code Online (Sandbox Code Playgroud)
但这并没有让我任何地方.
过滤器部分不起作用; map函数返回一个spark.sql.Columns序列,但是filter函数需要返回一个Boolean,所以我需要从Column中获取一个值来测试,但似乎只有Column方法返回一个Column.
有没有办法在Spark上更"简单"地做到这一点?
感谢您的输入
编辑:
简单示例示例输入:
2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...
Run Code Online (Sandbox Code Playgroud)
预期产量:
2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22
Run Code Online (Sandbox Code Playgroud)
注意:
编辑:
按照@ zero323的回答我试过这样:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rows: RDD[Row] = df.orderBy($"Date").rdd
def notMissing(row: Row): Boolean = { !row.isNullAt(1) } …Run Code Online (Sandbox Code Playgroud) 作为一个简化示例,我有一个数据框"df",其列为"col1,col2",我想在将函数应用于每列后计算行的最大值:
def f(x):
return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))
Run Code Online (Sandbox Code Playgroud)
所以如果df:
col1 col2
1 2
3 0
Run Code Online (Sandbox Code Playgroud)
然后
DF2:
col1 col2 result
1 2 3
3 0 4
Run Code Online (Sandbox Code Playgroud)
以上似乎不起作用并产生"无法评估表达式:PythonUDF#f ......"
我绝对肯定"f_udf"在我的桌子上运行得很好,主要问题在于max_udf.
如果不创建额外的列或使用基本的map/reduce,有没有办法完全使用数据帧和udfs?我该如何修改"max_udf"?
我也尝试过:
max_udf=udf(max, IntegerType())
Run Code Online (Sandbox Code Playgroud)
这会产生相同的错误.
我还确认以下工作:
df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))
Run Code Online (Sandbox Code Playgroud)
为什么我不能一气呵成呢?
我希望看到一个可以概括为任何函数"f_udf"和"max_udf"的答案.
我是一个火花应用程序,有几点我想坚持当前的状态.这通常是在一大步之后,或缓存我想要多次使用的状态.看来,当我第二次在我的数据帧上调用缓存时,新副本会缓存到内存中.在我的应用程序中,这会在扩展时导致内存问题.即使在我当前的测试中,给定的数据帧最大约为100 MB,中间结果的累积大小也会超出执行程序的分配内存.请参阅下面的一个显示此行为的小示例.
cache_test.py:
from pyspark import SparkContext, HiveContext
spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)
df = (hive_context.read
.format('com.databricks.spark.csv')
.load('simple_data.csv')
)
df.cache()
df.show()
df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()
spark_context.stop()
Run Code Online (Sandbox Code Playgroud)
simple_data.csv:
1,2,3
4,5,6
7,8,9
Run Code Online (Sandbox Code Playgroud)
查看应用程序UI,有一个原始数据框的副本,与新列的副本相对应.我可以通过df.unpersist()在withColumn行之前调用来删除原始副本.这是删除缓存中间结果的推荐方法(即在每次调用之前调用unpersist cache()).
此外,是否可以清除所有缓存的对象.在我的应用程序中,有一些自然断点,我可以简单地清除所有内存,然后转到下一个文件.我想这样做而不为每个输入文件创建一个新的spark应用程序.
先感谢您!
我想使用spark withColumnRenamed函数更改两列的名称.当然,我可以写:
data = sqlContext.createDataFrame([(1,2), (3,4)], ['x1', 'x2'])
data = (data
.withColumnRenamed('x1','x3')
.withColumnRenamed('x2', 'x4'))
Run Code Online (Sandbox Code Playgroud)
但我想一步到位(有新名单的列表/元组).不幸的是,这不是:
data = data.withColumnRenamed(['x1', 'x2'], ['x3', 'x4'])
Run Code Online (Sandbox Code Playgroud)
也不是这样
data = data.withColumnRenamed(('x1', 'x2'), ('x3', 'x4'))
Run Code Online (Sandbox Code Playgroud)
工作中.有可能这样做吗?
使用Spark的DataFrame时,需要使用用户定义函数(UDF)来映射列中的数据.UDF要求显式指定参数类型.在我的情况下,我需要操作由对象数组组成的列,我不知道要使用什么类型.这是一个例子:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
Run Code Online (Sandbox Code Playgroud)
使用内置org.apache.spark.sql.functions函数对列中的数据执行基本操作相对简单
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
Run Code Online (Sandbox Code Playgroud)
并且通常很容易编写自定义UDF来执行任意操作
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : …Run Code Online (Sandbox Code Playgroud) scala user-defined-functions dataframe apache-spark apache-spark-sql
apache-spark ×10
apache-spark-sql ×10
pyspark ×4
python ×4
scala ×4
dataframe ×3
caching ×1
case-class ×1
jdbc ×1
rename ×1