此命令适用于HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
Run Code Online (Sandbox Code Playgroud)
但是使用Spark SQL我收到了一个org.apache.spark.sql.hive.HiveQl堆栈跟踪错误:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
Run Code Online (Sandbox Code Playgroud)
请指导我在Spark SQL中编写导出到CSV功能.
我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.
在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sa和sb.每个Row包含name,id_sa和id_sb.我的目标是从生产映射id_sa到id_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.
让我们试着用一个例子来澄清.如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)
我的目标是从生产映射a1到b2.事实上,相关的名称a1是n1,n2和n3,分别映射b1,b2和b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.
我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)
但也许我正在尝试使用错误的工具,我应该回到使用RDD.
DataFrame repartition()和DataFrameWriter partitionBy()方法有什么区别?
我希望两者都习惯于"基于数据帧列分区数据"?或者有什么区别?
我正在使用scala对spark进行一些测试.我们通常会读取需要操作的json文件,如下例所示:
test.json:
{"a":1,"b":[2,3]}
Run Code Online (Sandbox Code Playgroud)
val test = sqlContext.read.json("test.json")
Run Code Online (Sandbox Code Playgroud)
如何将其转换为以下格式:
{"a":1,"b":2}
{"a":1,"b":3}
Run Code Online (Sandbox Code Playgroud) 我想修复/合并我的数据,以便将其保存到每个分区的一个Parquet文件中.我还想使用Spark SQL partitionBy API.所以我可以这样做:
df.coalesce(1).write.partitionBy("entity", "year", "month", "day", "status")
.mode(SaveMode.Append).parquet(s"$location")
Run Code Online (Sandbox Code Playgroud)
我已经测试了这个并且它似乎表现不佳.这是因为在数据集中只有一个分区可以处理,文件的所有分区,压缩和保存都必须由一个CPU内核完成.
在调用coalesce之前,我可以重写这个来手动执行分区(使用带有不同分区值的过滤器).
但是使用标准的Spark SQL API有更好的方法吗?
我有两个包含以下列的数据框:
df1.columns
// Array(ts, id, X1, X2)
Run Code Online (Sandbox Code Playgroud)
和
df2.columns
// Array(ts, id, Y1, Y2)
Run Code Online (Sandbox Code Playgroud)
我之后
val df_combined = df1.join(df2, Seq(ts,id))
Run Code Online (Sandbox Code Playgroud)
我最终得到以下列:Array(ts, id, X1, X2, ts, id, Y1, Y2).我可以预期公共列将被删除.有什么额外的东西需要做吗?
我正在使用pyspark,使用spark-csv将大型csv文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含json字符串)中可用的数据应用各种操作.这将返回X值,每个值都需要存储在各自独立的列中.
该功能将在UDF中实现.但是,我不确定如何从该UDF返回值列表并将这些值提供给单个列.下面是一个简单的例子:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
Run Code Online (Sandbox Code Playgroud)
这产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
将udf在不同的列上返回的两个值(在此示例中)存储的最佳方法是什么?现在他们被键入字符串:
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
Run Code Online (Sandbox Code Playgroud) python user-defined-functions apache-spark apache-spark-sql pyspark
我正在使用Spark 1.5.
我有两个表格的数据框:
scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]
scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]
Run Code Online (Sandbox Code Playgroud)
libriFirstTable50Plus3DF有766,151条记录,而linkPersonItemLessThan500DF有26,694,353条记录.请注意我正在使用repartition(number),linkPersonItemLessThan500DF因为我打算稍后加入这两个.我正在跟进以上代码:
val userTripletRankDF = linkPersonItemLessThan500DF
.join(libriFirstTable50Plus3DF, Seq("family_id"))
.take(20)
.foreach(println(_))
Run Code Online (Sandbox Code Playgroud)
我得到这个输出:
16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala: at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at …Run Code Online (Sandbox Code Playgroud) 例如
sqlContext = SQLContext(sc)
sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()
Run Code Online (Sandbox Code Playgroud)
上面的语句在终端上打印整个表,但我想使用for或while访问该表中的每一行以执行进一步的计算.
apache-spark-sql ×10
apache-spark ×9
pyspark ×3
scala ×3
dataframe ×1
for-loop ×1
hadoop ×1
hiveql ×1
join ×1
python ×1