标签: pyspark

Spark RDD - 使用额外参数进行映射

是否可以将额外的参数传递给pySpark中的映射函数?具体来说,我有以下代码配方:

raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)
Run Code Online (Sandbox Code Playgroud)

processDataLine除了JSON对象之外,该函数还需要额外的参数,如下所示:

def processDataLine(dataline, arg1, arg2)
Run Code Online (Sandbox Code Playgroud)

如何传递额外的参数arg1,并arg2flaMap功能?

python apache-spark rdd pyspark

28
推荐指数
1
解决办法
2万
查看次数

Spark窗口函数 - rangeBetween日期

我有一个DataFrame带有数据的Spark SQL ,我想要得到的是给定日期范围内当前行之前的所有行.因此,例如,我希望将7天之前的所有行放在给定行之前.我想我需要使用Window Function像:

Window \
    .partitionBy('id') \
    .orderBy('start')
Run Code Online (Sandbox Code Playgroud)

这就是问题所在.我想要有rangeBetween7天的时间,但是我在这个文件中找不到任何内容.Spark甚至提供这样的选择吗?现在我只是得到前面的所有行:

.rowsBetween(-sys.maxsize, 0)
Run Code Online (Sandbox Code Playgroud)

但想要实现以下目标:

.rangeBetween("7 days", 0)
Run Code Online (Sandbox Code Playgroud)

如果有人能帮助我,我将非常感激.提前致谢!

sql window-functions apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
2万
查看次数

如何在Pyspark中替换数据框的所有Null值

我在pyspark中有一个包含300多列的数据框.在这些列中,有一些值为null的列.

例如:

Column_1 column_2
null     null
null     null
234      null
125      124
365      187
and so on
Run Code Online (Sandbox Code Playgroud)

当我想做一个column_1的总和时,我得到的是Null,而不是724.

现在我想用空格替换数据框的所有列中的null.因此,当我尝试对这些列求和时,我没有得到空值,但我会得到一个数值.

我们怎样才能在pyspark实现这一目标

null dataframe pyspark

28
推荐指数
3
解决办法
4万
查看次数

PySpark group中的中位数/分位数

我想在Spark数据帧上计算组分位数(使用PySpark).近似或精确的结果都可以.我更喜欢在groupBy/ 的上下文中使用的解决方案agg,以便我可以将它与其他PySpark聚合函数混合使用.如果由于某种原因这是不可能的,那么不同的方法也可以.

这个问题是相关的,但没有说明如何approxQuantile用作聚合函数.

我也可以访问percentile_approxHive UDF,但我不知道如何将它用作聚合函数.

为了特异性,假设我有以下数据帧:

from pyspark import SparkContext
import pyspark.sql.functions as f

sc = SparkContext()    

df = sc.parallelize([
    ['A', 1],
    ['A', 2],
    ['A', 3],
    ['B', 4],
    ['B', 5],
    ['B', 6],
]).toDF(('grp', 'val'))

df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
Run Code Online (Sandbox Code Playgroud)

预期结果是:

+----+-------+
| grp|med_val|
+----+-------+
|   A|      2|
|   B|      5|
+----+-------+
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

28
推荐指数
5
解决办法
2万
查看次数

在运行时增加PySpark可用的内存

我正在尝试使用Spark构建一个推荐程序,但内存不足:

Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Run Code Online (Sandbox Code Playgroud)

我想通过spark.executor.memory在运行时修改PySpark中的属性来增加Spark可用的内存.

那可能吗?如果是这样,怎么样?

更新

受@ zero323注释中链接的启发,我试图在PySpark中删除并重新创建上下文:

del sc
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("http://hadoop01.woolford.io:7077").setAppName("recommender").set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)
Run Code Online (Sandbox Code Playgroud)

回:

ValueError: Cannot run multiple SparkContexts at once;
Run Code Online (Sandbox Code Playgroud)

这很奇怪,因为:

>>> sc
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'sc' is not defined
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

27
推荐指数
2
解决办法
3万
查看次数

PySpark序列化EOFError

我正在阅读CSV作为Spark DataFrame并在其上执行机器学习操作.我一直在获取Python序列化EOFError - 任何想法为什么?我认为这可能是一个内存问题 - 即文件超出可用RAM - 但是大幅减小DataFrame的大小并没有阻止EOF错误.

玩具代码和错误如下.

#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
     inferschema='true').load('myfile.csv')

#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)

#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()
Run Code Online (Sandbox Code Playgroud)

spark-submit在单个节点上运行上述代码会重复抛出以下错误,即使在拟合模型之前减小了DataFrame的大小(例如tinydf = df.sample(False, 0.00001):

Traceback (most recent call last):
  File …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-1.6

27
推荐指数
1
解决办法
6765
查看次数

在PySpark中的GroupedData上应用UDF(具有正常运行的python示例)

我有这个在pandas数据帧中本地运行的python代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)

我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.

我尝试过以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 
Run Code Online (Sandbox Code Playgroud)

返回

KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)

我推测因为'A'不再是一列而我找不到x.name的等价物.

然后

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()
Run Code Online (Sandbox Code Playgroud)

但是得到以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)

任何建议将非常感谢!

python user-defined-functions apache-spark apache-spark-sql pyspark

27
推荐指数
4
解决办法
2万
查看次数

如何有效地找到PySpark数据帧中每列的Null和Nan值的计数?

import numpy as np

df = spark.createDataFrame(
    [(1, 1, None), (1, 2, float(5)), (1, 3, np.nan), (1, 4, None), (1, 5, float(10)), (1, 6, float('nan')), (1, 6, float('nan'))],
    ('session', "timestamp1", "id2"))
Run Code Online (Sandbox Code Playgroud)

预期产出

每列的数量为nan/null的数据帧

注意: 我在堆栈溢出中发现的先前问题仅检查null而不是nan.这就是为什么我创造了一个新问题.

我知道我可以在spark中使用isnull()函数来查找Spark列中的Null值的数量但是如何在Spark数据帧中找到Nan值?

apache-spark apache-spark-sql pyspark pyspark-sql

27
推荐指数
4
解决办法
6万
查看次数

PySpark:如何在特定列的数据框中填充值?

我有以下示例DataFrame:

a    | b    | c   | 

1    | 2    | 4   |
0    | null | null| 
null | 3    | 4   |
Run Code Online (Sandbox Code Playgroud)

我想只在前2列中替换空值 - 列"a"和"b":

a    | b    | c   | 

1    | 2    | 4   |
0    | 0    | null| 
0    | 3    | 4   |
Run Code Online (Sandbox Code Playgroud)

以下是创建示例数据帧的代码:

rdd = sc.parallelize([(1,2,4), (0,None,None), (None,3,4)])
df2 = sqlContext.createDataFrame(rdd, ["a", "b", "c"])
Run Code Online (Sandbox Code Playgroud)

我知道如何使用以下方法替换所有空值:

df2 = df2.fillna(0)
Run Code Online (Sandbox Code Playgroud)

当我尝试这个时,我失去了第三列:

df2 = df2.select(df2.columns[0:1]).fillna(0)
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe

27
推荐指数
2
解决办法
4万
查看次数

Apache Spark中的高效字符串匹配

使用OCR工具我从截图中提取文本(每个约1-5个句子).但是,在手动验证提取的文本时,我注意到有时会出现几个错误.

鉴于文本"你好!我真的喜欢Spark❤️!",我注意到:

1)像"I","!"和"l"这样的字母被"|"代替.

2)Emojis未被正确提取并被其他字符替换或被遗漏.

3)不时删除空格.

结果,我可能会得到一个像这样的字符串:"你好7l |真实|喜欢Spark!"

因为我试图将这些字符串与包含正确文本的数据集相匹配(在这种情况下"Hello there!我真的很喜欢Spark❤️!"),我正在寻找一种有效的方法来匹配Spark中的字符串.

任何人都可以建议一个有效的Spark算法,它允许我比较提取文本(〜100.000)与我的数据集(约1亿)?

python fuzzy-search string-matching apache-spark pyspark

26
推荐指数
1
解决办法
5478
查看次数