标签: spark-dataframe

如何使用pyspark数据帧查找std dev分区或分组数据?

代码:

w = Window().partitionBy("ticker").orderBy("date")
x = s_df.withColumn("daily_return", (col("close") - lag("close", 1).over(w)) / lag("close", 1).over(w))
Run Code Online (Sandbox Code Playgroud)

s_df 的样子:

+----------+------+------+------+------+--------+------+
|      date|  open|  high|   low| close|  volume|ticker|
+----------+------+------+------+------+--------+------+
|2016-11-02| 111.4|112.35|111.23|111.59|28331709|  AAPL|
|2016-11-01|113.46|113.77|110.53|111.49|43825812|  AAPL|
|2016-10-31|113.65|114.23| 113.2|113.54|26419398|  AAPL|
+----------+------+------+------+------+--------+------+
Run Code Online (Sandbox Code Playgroud)

那么 X 的样子:

+----------+--------------------+
|      date|   avg(daily_return)|
+----------+--------------------+
|2015-12-28|0.004124786535090563|
|2015-11-20|0.006992226387807268|
|2015-12-29| 0.01730500286123971|
Run Code Online (Sandbox Code Playgroud)

我想找到每组股票的 avg(daily_return) 的标准偏差。

我试过的:

x.agg(stddev("avg(daily_return)")).over(w)
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

AttributeError: 'DataFrame' object has no attribute 'over'
Run Code Online (Sandbox Code Playgroud)

我正在尝试做的事情是不可能的,还是有另一种方法可以做到?

在此处输入图片说明

python apache-spark pyspark spark-dataframe

0
推荐指数
1
解决办法
5965
查看次数

从 Spark BinaryType 中提取字节

我有一个带有 BinaryType 类型的二进制列的表:

>>> df.show(3)
+--------+--------------------+
|       t|               bytes|
+--------+--------------------+
|0.145533|[10 50 04 89 00 3...|
|0.345572|[60 94 05 89 80 9...|
|0.545574|[99 50 68 89 00 7...|
+--------+--------------------+
only showing top 3 rows
>>> df.schema
StructType(List(StructField(t,DoubleType,true),StructField(bytes,BinaryType,true)))
Run Code Online (Sandbox Code Playgroud)

如果我提取二进制文件的第一个字节,我会收到来自 Spark 的异常:

>>> df.select(n["t"], df["bytes"].getItem(0)).show(3)
AnalysisException: u"Can't extract value from bytes#477;"
Run Code Online (Sandbox Code Playgroud)

演员阵容ArrayType(ByteType)也不起作用:

>>> df.select(n["t"], df["bytes"].cast(ArrayType(ByteType())).getItem(0)).show(3)
AnalysisException: u"cannot resolve '`bytes`' due to data type mismatch: cannot cast BinaryType to ArrayType(ByteType,true) ..."
Run Code Online (Sandbox Code Playgroud)

如何提取字节?

pyspark spark-dataframe pyspark-sql

0
推荐指数
1
解决办法
4151
查看次数

在 Spark Java API 中将 JavaPairRDD 转换为 Dataframe

我在 Java 7 中使用 Spark 1.6

我有一对RDD:

JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);
Run Code Online (Sandbox Code Playgroud)

我想将其转换DataFrame为模式。

看来首先我必须将pairRDD转换为RowRDD。

那么如何从 PairRDD 创建 RowRdd 呢?

java apache-spark rdd spark-dataframe java-pair-rdd

0
推荐指数
1
解决办法
6200
查看次数

根据列的最大值过滤火花数据框

我想做这样的事情:

df
.withColumn("newCol", <some formula>)
.filter(s"""newCol > ${(math.min(max("newCol").asInstanceOf[Double],10))}""")
Run Code Online (Sandbox Code Playgroud)

我得到的例外:

org.apache.spark.sql.Column cannot be cast to java.lang.Double
Run Code Online (Sandbox Code Playgroud)

你能建议我一种方法来实现我想要的吗?

scala apache-spark spark-dataframe

0
推荐指数
1
解决办法
6736
查看次数

Spark Dataframe UDF - 不支持类型为Any的架构

我正在编写Spark Scala UDF并面临"java.lang.UnsupportedOperationException:不支持类型为Any的架构"

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

val aBP = udf((bG: String, pS: String, bP: String, iOne: String, iTwo: String) => {
  if (bG != "I") {"NA"}
  else if (pS == "D")
    {if (iTwo != null) iOne else "NA"}
  else if (pS == "U")
    {if (bP != null) bP else "NA"}
})
Run Code Online (Sandbox Code Playgroud)

这是抛出错误"java.lang.UnsupportedOperationException:不支持类型为Any的模式"

scala user-defined-functions apache-spark spark-dataframe

0
推荐指数
1
解决办法
4066
查看次数

我们可以在不创建模式的情况下在 spark 数据框中加载分隔文本文件吗?

我有格式良好的文本文件,如波纹管。

TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|
Run Code Online (Sandbox Code Playgroud)

现在我必须将此文本文件加载到 spark 数据框中。

我可以这样做

val schema = StructType(Array(

      StructField("OrgId", StringType),
      StructField("LineItemId", StringType),
      StructField("SegmentId", StringType), …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
1万
查看次数

重命名在hadoop中创建的文件-Spark

通过写入在HDFS中创建的文件具有其自己的命名约定。要将其更改为自定义名称,可以通过脚本使用hadoop fs -mv oldname newname

Spark / Hadoop中是否有其他可用选项可为创建的文件提供自定义名称。

hadoop apache-spark spark-dataframe

0
推荐指数
1
解决办法
2887
查看次数

如何在保留整行的同时获得具有最大值的单行?

我想为每个 id 获取单行,其中仅存在 Charge 列的最大值。

示例输入数据:

id  name charge 
11  hg   10    
11  mm   20
22  aa   40
22  bb   40
Run Code Online (Sandbox Code Playgroud)

我试过的代码:

df.agg(max("charge"))
Run Code Online (Sandbox Code Playgroud)

我只得到最大值,如下所示:

charge
40   
Run Code Online (Sandbox Code Playgroud)

但是,我想保留整行:

id  name charge
11  mm   20
22  aa   40
22  bb   40
Run Code Online (Sandbox Code Playgroud)

如何保留前两列?name 列对于相同的 id 可以有不同的值,因此不可能groupBy在这两个列上使用并聚合结果。

如果两行具有相同的 id 并收费,则应保留两行。

scala apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
9456
查看次数

由于 . 在spark的列名中

这是我现有的数据框

+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|DataPartition     |TimeStamp                |_lineItemId|_organizationId|fl:FinancialConceptGlobal|fl:FinancialConceptGlobalId|fl:FinancialConceptLocal|fl:FinancialConceptLocalId|fl:InstrumentId|fl:IsCredit|fl:IsDimensional|fl:IsRangeAllowed|fl:IsSegmentedByOrigin|fl:SegmentGroupDescription|fl:Segments|fl:StatementTypeCode|FFAction|!||LineItemName                                                                                |LineItemName.languageId|LocalLanguageLabel|LocalLanguageLabel.languageId|SegmentChildDescription|SegmentChildDescription.languageId|
+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|3          |4298009288     |XTOT                     |3016350                    |null                    |null                      |null           |true       |false           |false            |false                 |null                      |null       |BAL                 |I|!|       |Total Assets                                                                                |505074                 |null              |null                         |null                   |null                              |
Run Code Online (Sandbox Code Playgroud)

这是上述数据框的模式

root
 |-- DataPartition: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _lineItemId: long (nullable = true)
 |-- _organizationId: long (nullable = true)
 |-- fl:FinancialConceptGlobal: string (nullable = true)
 |-- fl:FinancialConceptGlobalId: long (nullable = true)
 |-- fl:FinancialConceptLocal: string (nullable = true)
 |-- fl:FinancialConceptLocalId: long (nullable = true) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

0
推荐指数
1
解决办法
8685
查看次数

对时间戳值的 Scala 操作

我有时间戳输入,基于某些条件,我需要使用 Scala 编程减去 1 秒或减去 3 个月

输入:

val date :String = "2017-10-31T23:59:59.000"
Run Code Online (Sandbox Code Playgroud)

输出:

减 1 秒

val lessOneSec = "2017-10-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)

减 3 个月

val less3Mon   = "2017-07-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)

如何将字符串值转换为时间戳并执行 Scala 编程中的减号等操作?

scala apache-spark spark-dataframe

0
推荐指数
1
解决办法
1607
查看次数

为什么加入两个火花数据帧会失败,除非我将“.as('alias)”添加到两者中?

假设有 2 个 Spark DataFrame 我们想加入,无论出于何种原因:

val df1 = Seq(("A", 1), ("B", 2), ("C", 3)).toDF("agent", "in_count")
val df2 = Seq(("A", 2), ("C", 2), ("D", 2)).toDF("agent", "out_count")
Run Code Online (Sandbox Code Playgroud)

可以用这样的代码来完成:

val joinedDf = df1.as('d1).join(df2.as('d2), ($"d1.agent" === $"d2.agent"))

// Result:
val joinedDf.show

+-----+--------+-----+---------+
|agent|in_count|agent|out_count|
+-----+--------+-----+---------+
|    A|       1|    A|        2|
|    C|       3|    C|        2|
+-----+--------+-----+---------+
Run Code Online (Sandbox Code Playgroud)

现在,我不明白的是,为什么它只在我使用别名df1.as(d1)和时才起作用df2.as(d2)?我可以想象,如果我直截了当地写,列之间会出现名称冲突

val joinedDf = df1.join(df2, ($"df1.agent" === $"df2.agent")) // fails
Run Code Online (Sandbox Code Playgroud)

但是......我不明白为什么我不能.as(alias) 使用两者中的一个 DF

df1.as('d1).join(df2, ($"d1.agent" === $"df2.agent")).show()
Run Code Online (Sandbox Code Playgroud)

失败

org.apache.spark.sql.AnalysisException: cannot …
Run Code Online (Sandbox Code Playgroud)

scala join apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
309
查看次数