我有一个DataFrame生成如下:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
如您所见,DataFrame按Hour
递增顺序排序,然后按TotalValue
降序排序.
我想选择每组的顶行,即
我想在a中添加一个DataFrame
具有任意值的列(对于每一行都是相同的).我使用时出现错误withColumn
如下:
dt.withColumn('new_column', 10).head(5)
Run Code Online (Sandbox Code Playgroud)
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
1 dt = (messages
2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)
/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
1167 """
-> 1168 return self.select('*', col.alias(colName))
1169
1170 @ignore_unicode_prefix
AttributeError: 'int' object has no attribute 'alias'
Run Code Online (Sandbox Code Playgroud)
似乎我可以通过添加和减去其中一个列(因此它们添加到零)然后添加我想要的数字(在这种情况下为10)来欺骗函数按照我想要的方式工作:
dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
Run Code Online (Sandbox Code Playgroud)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
Row(fromuserid=93506471, messagetype=1, dt=4809600.0, …
Run Code Online (Sandbox Code Playgroud) 我有一个Spark DataFrame(使用PySpark 1.5.1)并想添加一个新列.
我试过以下但没有成功:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Run Code Online (Sandbox Code Playgroud)
使用这个也有错误:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
Run Code Online (Sandbox Code Playgroud)
那么如何使用PySpark将新列(基于Python向量)添加到现有的DataFrame中?
我正在使用https://github.com/databricks/spark-csv,我正在尝试编写单个CSV,但不能,它正在创建一个文件夹.
需要一个Scala函数,它将获取路径和文件名等参数并写入该CSV文件.
我试图使用train_test_split
包scikit Learn,但我遇到参数问题stratify
.以下是代码:
from sklearn import cross_validation, datasets
X = iris.data[:,:2]
y = iris.target
cross_validation.train_test_split(X,y,stratify=y)
Run Code Online (Sandbox Code Playgroud)
但是,我一直遇到以下问题:
raise TypeError("Invalid parameters passed: %s" % str(options))
TypeError: Invalid parameters passed: {'stratify': array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, …
Run Code Online (Sandbox Code Playgroud) 如何查询具有复杂类型(如地图/数组)的RDD?例如,当我写这个测试代码时:
case class Test(name: String, map: Map[String, String])
val map = Map("hello" -> "world", "hey" -> "there")
val map2 = Map("hello" -> "people", "hey" -> "you")
val rdd = sc.parallelize(Array(Test("first", map), Test("second", map2)))
Run Code Online (Sandbox Code Playgroud)
我虽然语法如下:
sqlContext.sql("SELECT * FROM rdd WHERE map.hello = world")
Run Code Online (Sandbox Code Playgroud)
要么
sqlContext.sql("SELECT * FROM rdd WHERE map[hello] = world")
Run Code Online (Sandbox Code Playgroud)
但我明白了
无法访问MapType类型中的嵌套字段(StringType,StringType,true)
和
org.apache.spark.sql.catalyst.errors.package $ TreeNodeException:未解析的属性
分别.
使用Scala 2.11.8的Spark 2.0(最终版).以下超级简单代码会产生编译错误Error:(17, 45) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
import org.apache.spark.sql.SparkSession
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.
master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我为Spark编写了一个Java程序.但是如何从Unix命令行运行和编译它.编译运行时是否必须包含任何jar
我知道如何在Spark SQL中编写UDF:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)
我可以做类似的定义聚合函数吗?这是怎么做到的?
对于上下文,我想运行以下SQL查询:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)
它应该返回类似的东西
Row(span1, false, T0)
我希望聚合函数告诉我opticalReceivePower
在定义的组中是否有任何值span
,timestamp
哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?
scala aggregate-functions user-defined-functions apache-spark apache-spark-sql
我想DataFrame
使用与列长度相关的条件来过滤a ,这个问题可能很容易,但我没有在SO中找到任何相关的问题.
更具体的,我有一个DataFrame
只有一个Column
,其中ArrayType(StringType())
,我要筛选的DataFrame
使用长度filterer,我拍下面的一个片段.
df = sqlContext.read.parquet("letters.parquet")
df.show()
# The output will be
# +------------+
# | tokens|
# +------------+
# |[L, S, Y, S]|
# |[L, V, I, S]|
# |[I, A, N, A]|
# |[I, L, S, A]|
# |[E, N, N, Y]|
# |[E, I, M, A]|
# |[O, A, N, A]|
# | [S, U, S]|
# +------------+
# But I want only the entries with length …
Run Code Online (Sandbox Code Playgroud)