是否可以将DataFrame火花直接保存到Hive中.
我已尝试转换DataFrame为Rdd然后保存为文本文件,然后加载到配置单元.但我想知道我是否可以直接保存dataframe到蜂巢
>>> a
DataFrame[id: bigint, julian_date: string, user_id: bigint]
>>> b
DataFrame[id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint]
>>> a.join(b, a.id==b.id, 'outer')
DataFrame[id: bigint, julian_date: string, user_id: bigint, id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint]
Run Code Online (Sandbox Code Playgroud)
有两个id: bigint,我想删除一个.我能怎么做?
我正在使用Spark 1.3.1(PySpark),我使用SQL查询生成了一个表.我现在有一个对象DataFrame.我想将此DataFrame对象(我将其称为"表")导出到csv文件,以便我可以操作它并绘制列.如何将DataFrame"表" 导出到csv文件?
谢谢!
python dataframe export-to-csv apache-spark apache-spark-sql
我一直试图找到一种合理的方法来测试SparkSessionJUnit测试框架.虽然似乎有很好的例子SparkContext,但我无法弄清楚如何使用相应的示例SparkSession,即使它在spark-testing-base内部的几个地方使用过.我很乐意尝试一种不使用spark-testing-base的解决方案,如果它不是真正正确的方式去这里.
简单的测试用例(完整MWE项目有build.sbt):
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite
import org.apache.spark.sql.SparkSession
class SessionTest extends FunSuite with DataFrameSuiteBase {
implicit val sparkImpl: SparkSession = spark
@Test
def simpleLookupTest {
val homeDir = System.getProperty("user.home")
val training = spark.read.format("libsvm")
.load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
println("completed simple lookup test")
}
}
Run Code Online (Sandbox Code Playgroud)
使用JUnit运行它的结果是加载线上的NPE:
java.lang.NullPointerException
at SessionTest.simpleLookupTest(SessionTest.scala:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at …Run Code Online (Sandbox Code Playgroud) val items = List("a", "b", "c")
sqlContext.sql("select c1 from table")
.filter($"c1".isin(items))
.collect
.foreach(println)
Run Code Online (Sandbox Code Playgroud)
上面的代码抛出以下异常.
Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon List(a, b, c)
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:49)
at org.apache.spark.sql.functions$.lit(functions.scala:89)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.Column.isin(Column.scala:642)
Run Code Online (Sandbox Code Playgroud)
以下是我尝试修复它.它编译并运行但不返回任何匹配.不知道为什么.
val items = List("a", "b", "c").mkString("\"","\",\"","\"")
sqlContext.sql("select c1 from table")
.filter($"c1".isin(items))
.collect
.foreach(println)
Run Code Online (Sandbox Code Playgroud) 正如我在Spark Dataframe中所知,多列的名称可以与下面的数据帧快照中显示的名称相同:
[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: …Run Code Online (Sandbox Code Playgroud) 这个问题的目标是记录:
在PySpark中使用JDBC连接读取和写入数据所需的步骤
JDBC源和已知解决方案可能存在的问题
通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.
我有一个日期pyspark数据帧,其格式为字符串列MM-dd-yyyy,我试图将其转换为日期列.
我试过了:
df.select(to_date(df.STRING_COLUMN).alias('new_date')).show()
我得到一串空值.有人可以帮忙吗?
有没有办法获得DataFrame的当前分区数?我检查了DataFrame javadoc(spark 1.6)并没有找到方法,或者我只是错过了它?(在JavaRDD的情况下,有一个getNumPartitions()方法.)
apache-spark ×10
apache-spark-sql ×10
pyspark ×5
dataframe ×4
python ×3
scala ×3
pyspark-sql ×2
hive ×1
junit ×1
unit-testing ×1