代码:
w = Window().partitionBy("ticker").orderBy("date")
x = s_df.withColumn("daily_return", (col("close") - lag("close", 1).over(w)) / lag("close", 1).over(w))
Run Code Online (Sandbox Code Playgroud)
s_df 的样子:
+----------+------+------+------+------+--------+------+
| date| open| high| low| close| volume|ticker|
+----------+------+------+------+------+--------+------+
|2016-11-02| 111.4|112.35|111.23|111.59|28331709| AAPL|
|2016-11-01|113.46|113.77|110.53|111.49|43825812| AAPL|
|2016-10-31|113.65|114.23| 113.2|113.54|26419398| AAPL|
+----------+------+------+------+------+--------+------+
Run Code Online (Sandbox Code Playgroud)
那么 X 的样子:
+----------+--------------------+
| date| avg(daily_return)|
+----------+--------------------+
|2015-12-28|0.004124786535090563|
|2015-11-20|0.006992226387807268|
|2015-12-29| 0.01730500286123971|
Run Code Online (Sandbox Code Playgroud)
我想找到每组股票的 avg(daily_return) 的标准偏差。
我试过的:
x.agg(stddev("avg(daily_return)")).over(w)
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
AttributeError: 'DataFrame' object has no attribute 'over'
Run Code Online (Sandbox Code Playgroud)
我正在尝试做的事情是不可能的,还是有另一种方法可以做到?
我有一个带有 BinaryType 类型的二进制列的表:
>>> df.show(3)
+--------+--------------------+
| t| bytes|
+--------+--------------------+
|0.145533|[10 50 04 89 00 3...|
|0.345572|[60 94 05 89 80 9...|
|0.545574|[99 50 68 89 00 7...|
+--------+--------------------+
only showing top 3 rows
>>> df.schema
StructType(List(StructField(t,DoubleType,true),StructField(bytes,BinaryType,true)))
Run Code Online (Sandbox Code Playgroud)
如果我提取二进制文件的第一个字节,我会收到来自 Spark 的异常:
>>> df.select(n["t"], df["bytes"].getItem(0)).show(3)
AnalysisException: u"Can't extract value from bytes#477;"
Run Code Online (Sandbox Code Playgroud)
演员阵容ArrayType(ByteType)也不起作用:
>>> df.select(n["t"], df["bytes"].cast(ArrayType(ByteType())).getItem(0)).show(3)
AnalysisException: u"cannot resolve '`bytes`' due to data type mismatch: cannot cast BinaryType to ArrayType(ByteType,true) ..."
Run Code Online (Sandbox Code Playgroud)
如何提取字节?
我在 Java 7 中使用 Spark 1.6
我有一对RDD:
JavaPairRDD<String, String> filesRDD = sc.wholeTextFiles(args[0]);
Run Code Online (Sandbox Code Playgroud)
我想将其转换DataFrame为模式。
看来首先我必须将pairRDD转换为RowRDD。
那么如何从 PairRDD 创建 RowRdd 呢?
我想做这样的事情:
df
.withColumn("newCol", <some formula>)
.filter(s"""newCol > ${(math.min(max("newCol").asInstanceOf[Double],10))}""")
Run Code Online (Sandbox Code Playgroud)
我得到的例外:
org.apache.spark.sql.Column cannot be cast to java.lang.Double
Run Code Online (Sandbox Code Playgroud)
你能建议我一种方法来实现我想要的吗?
我正在编写Spark Scala UDF并面临"java.lang.UnsupportedOperationException:不支持类型为Any的架构"
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf
val aBP = udf((bG: String, pS: String, bP: String, iOne: String, iTwo: String) => {
if (bG != "I") {"NA"}
else if (pS == "D")
{if (iTwo != null) iOne else "NA"}
else if (pS == "U")
{if (bP != null) bP else "NA"}
})
Run Code Online (Sandbox Code Playgroud)
这是抛出错误"java.lang.UnsupportedOperationException:不支持类型为Any的模式"
我有格式良好的文本文件,如波纹管。
TimeStamp|^|LineItem_organizationId|^|LineItem_lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
1506702452474|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|2|^||^|ACAE|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018759|^||^|I|!|
1506702452475|^|4295876606|^|4|^|BAL|^|Raw Materials And Supplies|^||^||^|AIRM|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3018830|^||^|I|!|
1506702452476|^|4295876606|^|10|^|BAL|^|Total current assets|^||^||^|XTCA|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019590|^||^|I|!|
1506702452477|^|4295876606|^|53|^|BAL|^|Deferred Assets Total|^||^||^|ADFN|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014598|^||^|I|!|
1506702452478|^|4295876606|^|54|^|BAL|^|Total Assets|^||^||^|XTOT|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016350|^||^|I|!|
1506702452479|^|4295876606|^|107|^|BAL|^|Total Number Of Treasury Stock|^||^||^|XTCTI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016331|^||^|I|!|
1506702452480|^|4295876606|^|108|^|BAL|^|Total Number Of Issued Shares|^||^||^|XTCII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016326|^||^|I|!|
1506702452481|^|4295876606|^|109|^|BAL|^|Total Number Of Issued Preferred Stock A|^||^||^|XTPII|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016352|^||^|I|!|
1506702452482|^|4295876606|^|111|^|CAS|^|Loss before income taxes|^||^||^|ONET|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3019196|^||^|I|!|
1506702452483|^|4295876606|^|130|^|CAS|^|Subtotal|^||^||^|FFFF|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014929|^||^|I|!|
1506702452484|^|4295876606|^|132|^|CAS|^|Net cash provided by (used in) operating activities|^||^||^|XTLO|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016344|^||^|I|!|
1506702452485|^|4295876606|^|133|^|CAS|^|Purchase of property, plant and equipment|^||^||^|ICEX|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014949|^||^|I|!|
1506702452486|^|4295876606|^|143|^|CAS|^|Net cash provided by (used in) investing activities|^||^||^|XTLI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3016342|^||^|I|!|
1506702452487|^|4295876606|^|145|^|CAS|^|Proceeds from long-term loans payable|^||^||^|FLDI|^|false|^||^||^||^||^|false|^||^||^||^||^|505126|^|505074|^||^||^||^||^||^||^||^|3014931|^||^|I|!|
Run Code Online (Sandbox Code Playgroud)
现在我必须将此文本文件加载到 spark 数据框中。
我可以这样做
val schema = StructType(Array(
StructField("OrgId", StringType),
StructField("LineItemId", StringType),
StructField("SegmentId", StringType), …Run Code Online (Sandbox Code Playgroud) 通过写入在HDFS中创建的文件具有其自己的命名约定。要将其更改为自定义名称,可以通过脚本使用hadoop fs -mv oldname newname
Spark / Hadoop中是否有其他可用选项可为创建的文件提供自定义名称。
我想为每个 id 获取单行,其中仅存在 Charge 列的最大值。
示例输入数据:
id name charge
11 hg 10
11 mm 20
22 aa 40
22 bb 40
Run Code Online (Sandbox Code Playgroud)
我试过的代码:
df.agg(max("charge"))
Run Code Online (Sandbox Code Playgroud)
我只得到最大值,如下所示:
charge
40
Run Code Online (Sandbox Code Playgroud)
但是,我想保留整行:
id name charge
11 mm 20
22 aa 40
22 bb 40
Run Code Online (Sandbox Code Playgroud)
如何保留前两列?name 列对于相同的 id 可以有不同的值,因此不可能groupBy在这两个列上使用并聚合结果。
如果两行具有相同的 id 并收费,则应保留两行。
这是我现有的数据框
+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|DataPartition |TimeStamp |_lineItemId|_organizationId|fl:FinancialConceptGlobal|fl:FinancialConceptGlobalId|fl:FinancialConceptLocal|fl:FinancialConceptLocalId|fl:InstrumentId|fl:IsCredit|fl:IsDimensional|fl:IsRangeAllowed|fl:IsSegmentedByOrigin|fl:SegmentGroupDescription|fl:Segments|fl:StatementTypeCode|FFAction|!||LineItemName |LineItemName.languageId|LocalLanguageLabel|LocalLanguageLabel.languageId|SegmentChildDescription|SegmentChildDescription.languageId|
+------------------+-------------------------+-----------+---------------+-------------------------+---------------------------+------------------------+--------------------------+---------------+-----------+----------------+-----------------+----------------------+--------------------------+-----------+--------------------+-----------+--------------------------------------------------------------------------------------------+-----------------------+------------------+-----------------------------+-----------------------+----------------------------------+
|SelfSourcedPrivate|2017-11-02T10:23:59+00:00|3 |4298009288 |XTOT |3016350 |null |null |null |true |false |false |false |null |null |BAL |I|!| |Total Assets |505074 |null |null |null |null |
Run Code Online (Sandbox Code Playgroud)
这是上述数据框的模式
root
|-- DataPartition: string (nullable = true)
|-- TimeStamp: string (nullable = true)
|-- _lineItemId: long (nullable = true)
|-- _organizationId: long (nullable = true)
|-- fl:FinancialConceptGlobal: string (nullable = true)
|-- fl:FinancialConceptGlobalId: long (nullable = true)
|-- fl:FinancialConceptLocal: string (nullable = true)
|-- fl:FinancialConceptLocalId: long (nullable = true) …Run Code Online (Sandbox Code Playgroud) 我有时间戳输入,基于某些条件,我需要使用 Scala 编程减去 1 秒或减去 3 个月
输入:
val date :String = "2017-10-31T23:59:59.000"
Run Code Online (Sandbox Code Playgroud)
输出:
减 1 秒
val lessOneSec = "2017-10-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)
减 3 个月
val less3Mon = "2017-07-31T23:59:58.000"
Run Code Online (Sandbox Code Playgroud)
如何将字符串值转换为时间戳并执行 Scala 编程中的减号等操作?
假设有 2 个 Spark DataFrame 我们想加入,无论出于何种原因:
val df1 = Seq(("A", 1), ("B", 2), ("C", 3)).toDF("agent", "in_count")
val df2 = Seq(("A", 2), ("C", 2), ("D", 2)).toDF("agent", "out_count")
Run Code Online (Sandbox Code Playgroud)
可以用这样的代码来完成:
val joinedDf = df1.as('d1).join(df2.as('d2), ($"d1.agent" === $"d2.agent"))
// Result:
val joinedDf.show
+-----+--------+-----+---------+
|agent|in_count|agent|out_count|
+-----+--------+-----+---------+
| A| 1| A| 2|
| C| 3| C| 2|
+-----+--------+-----+---------+
Run Code Online (Sandbox Code Playgroud)
现在,我不明白的是,为什么它只在我使用别名df1.as(d1)和时才起作用df2.as(d2)?我可以想象,如果我直截了当地写,列之间会出现名称冲突
val joinedDf = df1.join(df2, ($"df1.agent" === $"df2.agent")) // fails
Run Code Online (Sandbox Code Playgroud)
但是......我不明白为什么我不能.as(alias) 只使用两者中的一个 DF:
df1.as('d1).join(df2, ($"d1.agent" === $"df2.agent")).show()
Run Code Online (Sandbox Code Playgroud)
失败
org.apache.spark.sql.AnalysisException: cannot …Run Code Online (Sandbox Code Playgroud) spark-dataframe ×11
apache-spark ×10
scala ×6
pyspark ×2
hadoop ×1
java ×1
join ×1
pyspark-sql ×1
python ×1
rdd ×1