我试图转换DataFrameSpark-Scala中的所有标题/列名称.截至目前,我想出了以下代码,它只替换了一个列名.
for( i <- 0 to origCols.length - 1) {
df.withColumnRenamed(
df.columns(i),
df.columns(i).toLowerCase
);
}
Run Code Online (Sandbox Code Playgroud) 现在,我必须用来df.count > 0检查它是否DataFrame为空.但它效率低下.有没有更好的方法来做到这一点.
谢谢.
PS:我想检查它是否为空,以便我只保存,DataFrame如果它不是空的
我想DataFrame在Scala中使用指定的模式创建.我曾尝试使用JSON读取(我的意思是读取空文件),但我认为这不是最好的做法.
我正在尝试过滤具有None行值的PySpark数据帧:
df.select('dt_mvmt').distinct().collect()
[Row(dt_mvmt=u'2016-03-27'),
Row(dt_mvmt=u'2016-03-28'),
Row(dt_mvmt=u'2016-03-29'),
Row(dt_mvmt=None),
Row(dt_mvmt=u'2016-03-30'),
Row(dt_mvmt=u'2016-03-31')]
Run Code Online (Sandbox Code Playgroud)
我可以使用字符串值正确过滤:
df[df.dt_mvmt == '2016-03-31']
# some results here
Run Code Online (Sandbox Code Playgroud)
但这失败了:
df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0
Run Code Online (Sandbox Code Playgroud)
但每个类别肯定都有价值观.这是怎么回事?
我有一个数据框,列为String.我想在PySpark中将列类型更改为Double类型.
以下是方式,我做了:
toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))
Run Code Online (Sandbox Code Playgroud)
只是想知道,这是通过Logistic回归运行的正确方法,我遇到了一些错误,所以我想知道,这是问题的原因.
我想将数据帧的字符串列转换为列表.我可以从DataframeAPI 找到的是RDD,所以我尝试先将其转换回RDD,然后将toArray函数应用于RDD.在这种情况下,长度和SQL工作就好了.但是,我从RDD得到的结果在每个元素周围都有方括号[A00001].我想知道是否有适当的方法将列转换为列表或删除方括号的方法.
任何建议,将不胜感激.谢谢!
我试图在spark 1.4.0和tachyon 0.6.4上使用off heap storage来保持我的RDD这样做:
val a = sqlContext.parquetFile("a1.parquet")
a.persist(org.apache.spark.storage.StorageLevel.OFF_HEAP)
a.count()
Run Code Online (Sandbox Code Playgroud)
之后我得到以下异常.
有什么想法吗?
15/06/16 10:14:53 INFO : Tachyon client (version 0.6.4) is trying to connect master @ localhost/127.0.0.1:19998
15/06/16 10:14:53 INFO : User registered at the master localhost/127.0.0.1:19998 got UserId 3
15/06/16 10:14:53 INFO TachyonBlockManager: Created tachyon directory at /tmp_spark_tachyon/spark-6b2512ab-7bb8-47ca-b6e2-8023d3d7f7dc/driver/spark-tachyon-20150616101453-ded3
15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_3 on ExternalBlockStore on localhost:33548 (size: 0.0 B)
15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_1 on ExternalBlockStore on localhost:33548 (size: 0.0 B)
15/06/16 10:14:53 ERROR TransportRequestHandler: …Run Code Online (Sandbox Code Playgroud) 我正在使用pyspark(Python 2.7.9/Spark 1.3.1)并且有一个数据帧GroupObject,我需要按降序对其进行过滤和排序.试图通过这段代码实现它.
group_by_dataframe.count().filter("`count` >= 10").sort('count', ascending=False)
Run Code Online (Sandbox Code Playgroud)
但它会引发以下错误.
sort() got an unexpected keyword argument 'ascending'
Run Code Online (Sandbox Code Playgroud) 有没有办法将聚合函数应用于数据帧的所有(或列表)列groupBy?换句话说,有没有办法避免为每一列执行此操作:
df.groupBy("col1")
.agg(sum("col2").alias("col2"), sum("col3").alias("col3"), ...)
Run Code Online (Sandbox Code Playgroud) 查看新的spark数据帧api,目前还不清楚是否可以修改数据帧列.
我怎么会去改变行的值x列y一个数据帧的?
在pandas这将是df.ix[x,y] = new_value
编辑:合并下面所述的内容,您无法修改现有数据框,因为它是不可变的,但您可以返回具有所需修改的新数据框.
如果您只想根据条件替换列中的值,例如np.where:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
Run Code Online (Sandbox Code Playgroud)
如果要对列执行某些操作并创建添加到数据帧的新列:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
Run Code Online (Sandbox Code Playgroud)
如果您希望新列与旧列具有相同的名称,则可以添加其他步骤:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
Run Code Online (Sandbox Code Playgroud) python apache-spark apache-spark-sql pyspark spark-dataframe