标签: apache-spark-sql

如何将数据从Spark SQL导出到CSV

此命令适用于HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;
Run Code Online (Sandbox Code Playgroud)

但是使用Spark SQL我收到了一个org.apache.spark.sql.hive.HiveQl堆栈跟踪错误:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable
Run Code Online (Sandbox Code Playgroud)

请指导我在Spark SQL中编写导出到CSV功能.

hadoop export-to-csv hiveql apache-spark apache-spark-sql

42
推荐指数
4
解决办法
12万
查看次数

在Spark DataFrame中查找每个组的最大行数

我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.

在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sasb.每个Row包含name,id_said_sb.我的目标是从生产映射id_said_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.

让我们试着用一个例子来澄清.如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)

我的目标是从生产映射a1b2.事实上,相关的名称a1n1,n2n3,分别映射b1,b2b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.

我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)

但也许我正在尝试使用错误的工具,我应该回到使用RDD.

apache-spark apache-spark-sql pyspark

42
推荐指数
2
解决办法
5万
查看次数

Spark SQL - df.repartition和DataFrameWriter partitionBy之间的区别?

DataFrame repartition()和DataFrameWriter partitionBy()方法有什么区别?

我希望两者都习惯于"基于数据帧列分区数据"?或者有什么区别?

data-partitioning apache-spark-sql

42
推荐指数
3
解决办法
3万
查看次数

在Spark中展平行

我正在使用scala对spark进行一些测试.我们通常会读取需要操作的json文件,如下例所示:

test.json:

{"a":1,"b":[2,3]}
Run Code Online (Sandbox Code Playgroud)
val test = sqlContext.read.json("test.json")
Run Code Online (Sandbox Code Playgroud)

如何将其转换为以下格式:

{"a":1,"b":2}
{"a":1,"b":3}
Run Code Online (Sandbox Code Playgroud)

scala distributed-computing apache-spark apache-spark-sql

41
推荐指数
1
解决办法
5万
查看次数

DataFrame partitionBy到单个Parquet文件(每个分区)

我想修复/合并我的数据,以便将其保存到每个分区的一个Parquet文件中.我还想使用Spark SQL partitionBy API.所以我可以这样做:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status")
  .mode(SaveMode.Append).parquet(s"$location")
Run Code Online (Sandbox Code Playgroud)

我已经测试了这个并且它似乎表现不佳.这是因为在数据集中只有一个分区可以处理,文件的所有分区,压缩和保存都必须由一个CPU内核完成.

在调用coalesce之前,我可以重写这个来手动执行分区(使用带有不同分区值的过滤器).

但是使用标准的Spark SQL API有更好的方法吗?

apache-spark apache-spark-sql

41
推荐指数
2
解决办法
4万
查看次数

加入后如何避免重复列?

我有两个包含以下列的数据框:

df1.columns
//  Array(ts, id, X1, X2)
Run Code Online (Sandbox Code Playgroud)

df2.columns
//  Array(ts, id, Y1, Y2)
Run Code Online (Sandbox Code Playgroud)

我之后

val df_combined = df1.join(df2, Seq(ts,id))
Run Code Online (Sandbox Code Playgroud)

我最终得到以下列:Array(ts, id, X1, X2, ts, id, Y1, Y2).我可以预期公共列将被删除.有什么额外的东西需要做吗?

scala apache-spark apache-spark-sql

41
推荐指数
5
解决办法
5万
查看次数

Apache Spark - 将UDF的结果分配给多个数据帧列

我正在使用pyspark,使用spark-csv将大型csv文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含json字符串)中可用的数据应用各种操作.这将返回X值,每个值都需要存储在各自独立的列中.

该功能将在UDF中实现.但是,我不确定如何从该UDF返回值列表并将这些值提供给单个列.下面是一个简单的例子:

(...)
from pyspark.sql.functions import udf
def udf_test(n):
    return [n/2, n%2]

test_udf=udf(udf_test)


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
Run Code Online (Sandbox Code Playgroud)

这产生以下结果:

+------+----------+--------------------+
|amount|trans_date|                test|
+------+----------+--------------------+
|  28.0|2016-02-07|         [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

将udf在不同的列上返回的两个值(在此示例中)存储的最佳方法是什么?现在他们被键入字符串:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()

root
 |-- amount: float (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- test: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

python user-defined-functions apache-spark apache-spark-sql pyspark

41
推荐指数
1
解决办法
2万
查看次数

为什么加入失败的"java.util.concurrent.TimeoutException:期货在[300秒]之后超时"?

我正在使用Spark 1.5.

我有两个表格的数据框:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
Run Code Online (Sandbox Code Playgroud)

libriFirstTable50Plus3DF766,151条记录,linkPersonItemLessThan500DF26,694,353条记录.请注意我正在使用repartition(number),linkPersonItemLessThan500DF因为我打算稍后加入这两个.我正在跟进以上代码:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))
Run Code Online (Sandbox Code Playgroud)

我得到这个输出:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at …
Run Code Online (Sandbox Code Playgroud)

scala join apache-spark apache-spark-sql

41
推荐指数
3
解决办法
4万
查看次数

如何遍历pyspark中的每一行dataFrame

例如

sqlContext = SQLContext(sc)

sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
Run Code Online (Sandbox Code Playgroud)

上面的语句在终端上打印整个表,但我想使用for或while访问该表中的每一行以执行进一步的计算.

for-loop dataframe apache-spark apache-spark-sql pyspark

40
推荐指数
5
解决办法
9万
查看次数

如何在spark中具有不同列数的两个DataFrame上执行并集?

我有2 DataFrame秒如下:

来源数据

我需要像这样的工会:

在此输入图像描述

unionAll功能不起作用,因为列的数量和名称不同.

我怎样才能做到这一点?

apache-spark apache-spark-sql

40
推荐指数
9
解决办法
5万
查看次数