标签: apache-spark-sql

如何将DataFrame直接保存到Hive?

是否可以将DataFrame火花直接保存到Hive中.

我已尝试转换DataFrameRdd然后保存为文本文件,然后加载到配置单元.但我想知道我是否可以直接保存dataframe到蜂巢

hive apache-spark apache-spark-sql

64
推荐指数
6
解决办法
14万
查看次数

如何删除pyspark数据帧中的列

>>> a
DataFrame[id: bigint, julian_date: string, user_id: bigint]
>>> b
DataFrame[id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint]
>>> a.join(b, a.id==b.id, 'outer')
DataFrame[id: bigint, julian_date: string, user_id: bigint, id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint]
Run Code Online (Sandbox Code Playgroud)

有两个id: bigint,我想删除一个.我能怎么做?

apache-spark apache-spark-sql pyspark

61
推荐指数
5
解决办法
13万
查看次数

如何将PySpark中的表数据框导出到csv?

我正在使用Spark 1.3.1(PySpark),我使用SQL查询生成了一个表.我现在有一个对象DataFrame.我想将此DataFrame对象(我将其称为"表")导出到csv文件,以便我可以操作它并绘制列.如何将DataFrame"表" 导出到csv文件?

谢谢!

python dataframe export-to-csv apache-spark apache-spark-sql

59
推荐指数
5
解决办法
15万
查看次数

如何在Spark 2.0+中编写单元测试?

我一直试图找到一种合理的方法来测试SparkSessionJUnit测试框架.虽然似乎有很好的例子SparkContext,但我无法弄清楚如何使用相应的示例SparkSession,即使它在spark-testing-base内部的几个地方使用过.我很乐意尝试一种不使用spark-testing-base的解决方案,如果它不是真正正确的方式去这里.

简单的测试用例(完整MWE项目build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}
Run Code Online (Sandbox Code Playgroud)

使用JUnit运行它的结果是加载线上的NPE:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at …
Run Code Online (Sandbox Code Playgroud)

junit unit-testing scala apache-spark apache-spark-sql

59
推荐指数
4
解决办法
4万
查看次数

如何将Column.isin与列表一起使用?

val items = List("a", "b", "c")

sqlContext.sql("select c1 from table")
          .filter($"c1".isin(items))
          .collect
          .foreach(println)
Run Code Online (Sandbox Code Playgroud)

上面的代码抛出以下异常.

Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon List(a, b, c) 
at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:49)
at org.apache.spark.sql.functions$.lit(functions.scala:89)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at org.apache.spark.sql.Column$$anonfun$isin$1.apply(Column.scala:642)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.Column.isin(Column.scala:642)
Run Code Online (Sandbox Code Playgroud)

以下是我尝试修复它.它编译并运行但不返回任何匹配.不知道为什么.

val items = List("a", "b", "c").mkString("\"","\",\"","\"")

sqlContext.sql("select c1 from table")
          .filter($"c1".isin(items))
          .collect
          .foreach(println)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

58
推荐指数
3
解决办法
4万
查看次数

Spark Dataframe区分具有重复名称的列

正如我在Spark Dataframe中所知,多列的名称可以与下面的数据帧快照中显示的名称相同:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

58
推荐指数
8
解决办法
8万
查看次数

如何使用JDBC源在(Py)Spark中写入和读取数据?

这个问题的目标是记录:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和已知解决方案可能存在的问题

通过小的更改,这些方法应该与其他支持的语言一起使用,包括Scala和R.

python scala apache-spark apache-spark-sql pyspark

57
推荐指数
1
解决办法
6万
查看次数

将pyspark字符串转换为日期格式

我有一个日期pyspark数据帧,其格式为字符串列MM-dd-yyyy,我试图将其转换为日期列.

我试过了:

df.select(to_date(df.STRING_COLUMN).alias('new_date')).show()

我得到一串空值.有人可以帮忙吗?

apache-spark apache-spark-sql pyspark pyspark-sql

56
推荐指数
5
解决办法
12万
查看次数

获取DataFrame的当前分区数

有没有办法获得DataFrame的当前分区数?我检查了DataFrame javadoc(spark 1.6)并没有找到方法,或者我只是错过了它?(在JavaRDD的情况下,有一个getNumPartitions()方法.)

dataframe apache-spark apache-spark-sql

55
推荐指数
4
解决办法
5万
查看次数

如何制作良好的可重现的Apache Spark示例

我花了相当多的时间阅读标签的一些问题,而且我经常发现海报没有提供足够的信息来真正理解他们的问题.我经常评论要求他们发布MCVE,但有时让他们显示一些样本输入/输出数据就像拔牙一样.例如:请参阅有关此问题的评论.

也许问题的一部分是人们只是不知道如何轻松地为火花数据帧创建MCVE.我认为将这个pandas问题的spark-dataframe版本作为可以链接的指南是有用的.

那么如何创造一个好的,可重复的例子呢?

dataframe apache-spark apache-spark-sql pyspark pyspark-sql

55
推荐指数
4
解决办法
3952
查看次数