标签: pyspark

在 Spark 中使用标点符号的未封闭字符类

我正在尝试构建一个正则表达式模式,该模式将从字符串中删除标点符号。我决定punctuationstring图书馆使用。但是,当我执行它时,Spark 返回一个错误,指出有一个未包含的字符。

我怀疑其中的字符punctuation在执行过程中关闭了引号。我有一种感觉,这应该很容易解决,但我不确定如何解决。我的代码如下:

from pyspark.sql.functions import regexp_replace, trim, col, lower
import string

def removePunctuation(column):

    no_punct = regexp_replace(column, string.punctuation, '')
    lowered = lower(no_punct)
    cleaned = strip(lowered)
    return cleaned
Run Code Online (Sandbox Code Playgroud)

我得到这个错误 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 86.0 failed 1 times, most recent failure: Lost task 0.0 in stage 86.0 (TID 3709, localhost): java.util.regex.PatternSyntaxException: Unclosed character class near index 31

python apache-spark pyspark

0
推荐指数
1
解决办法
3278
查看次数

如何使用pyspark数据帧查找std dev分区或分组数据?

代码:

w = Window().partitionBy("ticker").orderBy("date")
x = s_df.withColumn("daily_return", (col("close") - lag("close", 1).over(w)) / lag("close", 1).over(w))
Run Code Online (Sandbox Code Playgroud)

s_df 的样子:

+----------+------+------+------+------+--------+------+
|      date|  open|  high|   low| close|  volume|ticker|
+----------+------+------+------+------+--------+------+
|2016-11-02| 111.4|112.35|111.23|111.59|28331709|  AAPL|
|2016-11-01|113.46|113.77|110.53|111.49|43825812|  AAPL|
|2016-10-31|113.65|114.23| 113.2|113.54|26419398|  AAPL|
+----------+------+------+------+------+--------+------+
Run Code Online (Sandbox Code Playgroud)

那么 X 的样子:

+----------+--------------------+
|      date|   avg(daily_return)|
+----------+--------------------+
|2015-12-28|0.004124786535090563|
|2015-11-20|0.006992226387807268|
|2015-12-29| 0.01730500286123971|
Run Code Online (Sandbox Code Playgroud)

我想找到每组股票的 avg(daily_return) 的标准偏差。

我试过的:

x.agg(stddev("avg(daily_return)")).over(w)
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

AttributeError: 'DataFrame' object has no attribute 'over'
Run Code Online (Sandbox Code Playgroud)

我正在尝试做的事情是不可能的,还是有另一种方法可以做到?

在此处输入图片说明

python apache-spark pyspark spark-dataframe

0
推荐指数
1
解决办法
5965
查看次数

从 Spark BinaryType 中提取字节

我有一个带有 BinaryType 类型的二进制列的表:

>>> df.show(3)
+--------+--------------------+
|       t|               bytes|
+--------+--------------------+
|0.145533|[10 50 04 89 00 3...|
|0.345572|[60 94 05 89 80 9...|
|0.545574|[99 50 68 89 00 7...|
+--------+--------------------+
only showing top 3 rows
>>> df.schema
StructType(List(StructField(t,DoubleType,true),StructField(bytes,BinaryType,true)))
Run Code Online (Sandbox Code Playgroud)

如果我提取二进制文件的第一个字节,我会收到来自 Spark 的异常:

>>> df.select(n["t"], df["bytes"].getItem(0)).show(3)
AnalysisException: u"Can't extract value from bytes#477;"
Run Code Online (Sandbox Code Playgroud)

演员阵容ArrayType(ByteType)也不起作用:

>>> df.select(n["t"], df["bytes"].cast(ArrayType(ByteType())).getItem(0)).show(3)
AnalysisException: u"cannot resolve '`bytes`' due to data type mismatch: cannot cast BinaryType to ArrayType(ByteType,true) ..."
Run Code Online (Sandbox Code Playgroud)

如何提取字节?

pyspark spark-dataframe pyspark-sql

0
推荐指数
1
解决办法
4151
查看次数

如何在Spark SQL中压缩两个数组列

我有一个熊猫数据框。我尝试将包含字符串值的两列首先连接到列表中,然后使用zip将列表中的每个元素都用'_'连接。我的数据集如下:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'
Run Code Online (Sandbox Code Playgroud)

我想将这两列连接到第三列,如下所示,用于数据框的每一行。

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]
Run Code Online (Sandbox Code Playgroud)

我已经使用下面的代码在python中成功完成了此操作,但该数据框非常大,并且需要花费很长时间才能为整个数据框运行它。我想在PySpark中做同样的事情以提高效率。我已经成功读取了spark数据框中的数据,但是我很难确定如何使用PySpark等效函数复制Pandas函数。如何在PySpark中获得想要的结果?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]
Run Code Online (Sandbox Code Playgroud)

我已使用以下代码将两列转换为PySpark中的数组

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)
Run Code Online (Sandbox Code Playgroud)

现在,我需要使用'_'在两列中压缩数组的每个元素。我该如何使用zip?任何帮助表示赞赏。

python pandas apache-spark apache-spark-sql pyspark

0
推荐指数
2
解决办法
3087
查看次数

Pyspark-在火花数据框中为每一行计数非零列

我有数据框,我需要在Pyspark中按行计算非零列的数量。

ID COL1 COL2 COL3
1  0    1    -1 
2  0    0     0 
3 -17   20    15
4  23   1     0
Run Code Online (Sandbox Code Playgroud)

预期产量:

ID COL1 COL2 COL3 Count
1   0    1    -1    2
2   0    0     0    0
3  -17   20    15   3
4   23   1     0    1
Run Code Online (Sandbox Code Playgroud)

pyspark

0
推荐指数
1
解决办法
226
查看次数

为什么在.distinct()之后总是有.collect()?

初学者火花。经常看到.distinct()。collect()结构。什么是在distinct()函数之后立即拥有collect()函数的内在原因是什么?

scala apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
122
查看次数

计算pyspark Dataframe中的列数?

我有一个包含15列的数据框(4个分类,其余为数字).

我为每个分类变量创建了虚拟变量.现在我想在新数据帧中找到变量的数量.

我试过计算长度printSchema(),但是NoneType:

print type(df.printSchema())
Run Code Online (Sandbox Code Playgroud)

machine-learning apache-spark pyspark pyspark-sql

-1
推荐指数
1
解决办法
2万
查看次数

Spark:将 DataFrame 列转换为向量

我有一个df带有一列的 DataFrame ,column我想转换column成一个向量(例如 a DenseVector),以便我可以在向量和矩阵产品中使用它。

当心:我不需要一列向量;我需要一个矢量对象。

这该怎么做?

我找到了vectorAssembler函数(链接),但这对我没有帮助,因为它将一些 DataFrame 列转换为向量列,这仍然是一个 DataFrame 列;我想要的输出应该是一个向量。


关于这个问题的目标:为什么我要尝试将 DF 列转换为向量?假设我有一个带有数字列的 DF,我需要计算矩阵和该列之间的乘积。我怎样才能做到这一点?(同样适用于 DF 数字行。)欢迎使用任何替代方法。

vector apache-spark apache-spark-sql pyspark

-1
推荐指数
1
解决办法
3472
查看次数

如何在 PySpark 中分解嵌套数据框并将其进一步存储到 hive

我面临一个特定的问题,其中我的 JSON 结构如下:-

{A:value,
B:value,
C:Array<Struct<A1:value,B1:value, C1:Array<struct<A2:value,B2:value>>>>
}
Run Code Online (Sandbox Code Playgroud)

我希望它以以下形式爆炸:-

{
A:value,
B:value,
A1:value,
B1:value,
A2:value,
B2:value
}
Run Code Online (Sandbox Code Playgroud)

我为此使用了 pyspark 数据框,但找不到正确爆炸的方法。任何帮助表示赞赏。

json hive dataframe apache-spark pyspark

-1
推荐指数
1
解决办法
5482
查看次数

使用 S3 数据帧调用 .show() 和 .toPandas() 时 Spark org.apache.http.ConnectionClosedException

df在 AWS S3 上用 Parquet 数据创建了一个 PySpark DataFrame 。调用df.count()有效,但df.show()df.toPandas()失败并出现以下错误:

Py4JJavaError: An error occurred while calling o41.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0
 failed 1 times, most recent failure: Lost task 0.0 in stage 8.0 (TID 14, 10.20.202.97, 
executor driver): org.apache.http.ConnectionClosedException: Premature end of Content-
Length delimited message body (expected: 77,826,675; received: 8,192)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
    at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
    at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
    at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
    at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
    at …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark pyspark

-1
推荐指数
1
解决办法
1158
查看次数