标签: apache-spark-sql

在Spark Scala中重命名DataFrame的列名

我试图转换DataFrameSpark-Scala中的所有标题/列名称.截至目前,我想出了以下代码,它只替换了一个列名.

for( i <- 0 to origCols.length - 1) {
  df.withColumnRenamed(
    df.columns(i), 
    df.columns(i).toLowerCase
  );
}
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql

83
推荐指数
3
解决办法
16万
查看次数

如何检查spark数据帧是否为空

现在,我必须用来df.count > 0检查它是否DataFrame为空.但它效率低下.有没有更好的方法来做到这一点.

谢谢.

PS:我想检查它是否为空,以便我只保存,DataFrame如果它不是空的

apache-spark apache-spark-sql

82
推荐指数
9
解决办法
6万
查看次数

如何使用指定的架构创建空DataFrame?

我想DataFrame在Scala中使用指定的模式创建.我曾尝试使用JSON读取(我的意思是读取空文件),但我认为这不是最好的做法.

scala dataframe apache-spark apache-spark-sql

79
推荐指数
3
解决办法
12万
查看次数

使用无值过滤Pyspark数据框列

我正在尝试过滤具有None行值的PySpark数据帧:

df.select('dt_mvmt').distinct().collect()

[Row(dt_mvmt=u'2016-03-27'),
 Row(dt_mvmt=u'2016-03-28'),
 Row(dt_mvmt=u'2016-03-29'),
 Row(dt_mvmt=None),
 Row(dt_mvmt=u'2016-03-30'),
 Row(dt_mvmt=u'2016-03-31')]
Run Code Online (Sandbox Code Playgroud)

我可以使用字符串值正确过滤:

df[df.dt_mvmt == '2016-03-31']
# some results here
Run Code Online (Sandbox Code Playgroud)

但这失败了:

df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0
Run Code Online (Sandbox Code Playgroud)

但每个类别肯定都有价值观.这是怎么回事?

python dataframe apache-spark apache-spark-sql pyspark

76
推荐指数
5
解决办法
16万
查看次数

如何在pyspark中将Dataframe列从String类型更改为Double类型

我有一个数据框,列为String.我想在PySpark中将列类型更改为Double类型.

以下是方式,我做了:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))
Run Code Online (Sandbox Code Playgroud)

只是想知道,这是通过Logistic回归运行的正确方法,我遇到了一些错误,所以我想知道,这是问题的原因.

python dataframe apache-spark apache-spark-sql pyspark

74
推荐指数
4
解决办法
16万
查看次数

在Apache Spark中将Dataframe的列值提取为List

我想将数据帧的字符串列转换为列表.我可以从DataframeAPI 找到的是RDD,所以我尝试先将其转换回RDD,然后将toArray函数应用于RDD.在这种情况下,长度和SQL工作就好了.但是,我从RDD得到的结果在每个元素周围都有方括号[A00001].我想知道是否有适当的方法将列转换为列表或删除方括号的方法.

任何建议,将不胜感激.谢谢!

scala apache-spark apache-spark-sql

73
推荐指数
4
解决办法
15万
查看次数

使用带有Spark 1.4.0和Tachyon 0.6.4的OFF_HEAP存储时出错

我试图在spark 1.4.0和tachyon 0.6.4上使用off heap storage来保持我的RDD这样做:

val a = sqlContext.parquetFile("a1.parquet")
a.persist(org.apache.spark.storage.StorageLevel.OFF_HEAP)
a.count()
Run Code Online (Sandbox Code Playgroud)

之后我得到以下异常.

有什么想法吗?

15/06/16 10:14:53 INFO : Tachyon client (version 0.6.4) is trying to connect master @ localhost/127.0.0.1:19998
15/06/16 10:14:53 INFO : User registered at the master localhost/127.0.0.1:19998 got UserId 3
15/06/16 10:14:53 INFO TachyonBlockManager: Created tachyon directory at /tmp_spark_tachyon/spark-6b2512ab-7bb8-47ca-b6e2-8023d3d7f7dc/driver/spark-tachyon-20150616101453-ded3
15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_3 on ExternalBlockStore on localhost:33548 (size: 0.0 B)
15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_1 on ExternalBlockStore on localhost:33548 (size: 0.0 B)
15/06/16 10:14:53 ERROR TransportRequestHandler: …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql alluxio

71
推荐指数
1
解决办法
1239
查看次数

Spark DataFrame groupBy并按降序排序(pyspark)

我正在使用pyspark(Python 2.7.9/Spark 1.3.1)并且有一个数据帧GroupObject,我需要按降序对其进行过滤和排序.试图通过这段代码实现它.

group_by_dataframe.count().filter("`count` >= 10").sort('count', ascending=False)
Run Code Online (Sandbox Code Playgroud)

但它会引发以下错误.

sort() got an unexpected keyword argument 'ascending'
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

66
推荐指数
5
解决办法
15万
查看次数

Spark SQL:将聚合函数应用于列列表

有没有办法将聚合函数应用于数据帧的所有(或列表)列groupBy?换句话说,有没有办法避免为每一列执行此操作:

df.groupBy("col1")
  .agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...)
Run Code Online (Sandbox Code Playgroud)

aggregate-functions dataframe apache-spark apache-spark-sql

65
推荐指数
2
解决办法
12万
查看次数

更新spark中的dataframe列

查看新的spark数据帧api,目前还不清楚是否可以修改数据帧列.

我怎么会去改变行的值xy一个数据帧的?

pandas这将是df.ix[x,y] = new_value

编辑:合并下面所述的内容,您无法修改现有数据框,因为它是不可变的,但您可以返回具有所需修改的新数据框.

如果您只想根据条件替换列中的值,例如np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Run Code Online (Sandbox Code Playgroud)

如果要对列执行某些操作并创建添加到数据帧的新列:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))
Run Code Online (Sandbox Code Playgroud)

如果您希望新列与旧列具有相同的名称,则可以添加其他步骤:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark spark-dataframe

64
推荐指数
5
解决办法
10万
查看次数