是否可以将额外的参数传递给pySpark中的映射函数?具体来说,我有以下代码配方:
raw_data_rdd = sc.textFile("data.json", use_unicode=True)
json_data_rdd = raw_data_rdd.map(lambda line: json.loads(line))
mapped_rdd = json_data_rdd.flatMap(processDataLine)
Run Code Online (Sandbox Code Playgroud)
processDataLine除了JSON对象之外,该函数还需要额外的参数,如下所示:
def processDataLine(dataline, arg1, arg2)
Run Code Online (Sandbox Code Playgroud)
如何传递额外的参数arg1,并arg2在flaMap功能?
我有一个DataFrame带有数据的Spark SQL ,我想要得到的是给定日期范围内当前行之前的所有行.因此,例如,我希望将7天之前的所有行放在给定行之前.我想我需要使用Window Function像:
Window \
.partitionBy('id') \
.orderBy('start')
Run Code Online (Sandbox Code Playgroud)
这就是问题所在.我想要有rangeBetween7天的时间,但是我在这个文件中找不到任何内容.Spark甚至提供这样的选择吗?现在我只是得到前面的所有行:
.rowsBetween(-sys.maxsize, 0)
Run Code Online (Sandbox Code Playgroud)
但想要实现以下目标:
.rangeBetween("7 days", 0)
Run Code Online (Sandbox Code Playgroud)
如果有人能帮助我,我将非常感激.提前致谢!
我在pyspark中有一个包含300多列的数据框.在这些列中,有一些值为null的列.
例如:
Column_1 column_2
null null
null null
234 null
125 124
365 187
and so on
Run Code Online (Sandbox Code Playgroud)
当我想做一个column_1的总和时,我得到的是Null,而不是724.
现在我想用空格替换数据框的所有列中的null.因此,当我尝试对这些列求和时,我没有得到空值,但我会得到一个数值.
我们怎样才能在pyspark实现这一目标
我想在Spark数据帧上计算组分位数(使用PySpark).近似或精确的结果都可以.我更喜欢在groupBy/ 的上下文中使用的解决方案agg,以便我可以将它与其他PySpark聚合函数混合使用.如果由于某种原因这是不可能的,那么不同的方法也可以.
这个问题是相关的,但没有说明如何approxQuantile用作聚合函数.
我也可以访问percentile_approxHive UDF,但我不知道如何将它用作聚合函数.
为了特异性,假设我有以下数据帧:
from pyspark import SparkContext
import pyspark.sql.functions as f
sc = SparkContext()
df = sc.parallelize([
['A', 1],
['A', 2],
['A', 3],
['B', 4],
['B', 5],
['B', 6],
]).toDF(('grp', 'val'))
df_grp = df.groupBy('grp').agg(f.magic_percentile('val', 0.5).alias('med_val'))
df_grp.show()
Run Code Online (Sandbox Code Playgroud)
预期结果是:
+----+-------+
| grp|med_val|
+----+-------+
| A| 2|
| B| 5|
+----+-------+
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Spark构建一个推荐程序,但内存不足:
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
Run Code Online (Sandbox Code Playgroud)
我想通过spark.executor.memory在运行时修改PySpark中的属性来增加Spark可用的内存.
那可能吗?如果是这样,怎么样?
更新
受@ zero323注释中链接的启发,我试图在PySpark中删除并重新创建上下文:
del sc
from pyspark import SparkConf, SparkContext
conf = (SparkConf().setMaster("http://hadoop01.woolford.io:7077").setAppName("recommender").set("spark.executor.memory", "2g"))
sc = SparkContext(conf = conf)
Run Code Online (Sandbox Code Playgroud)
回:
ValueError: Cannot run multiple SparkContexts at once;
Run Code Online (Sandbox Code Playgroud)
这很奇怪,因为:
>>> sc
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
NameError: name 'sc' is not defined
Run Code Online (Sandbox Code Playgroud) 我正在阅读CSV作为Spark DataFrame并在其上执行机器学习操作.我一直在获取Python序列化EOFError - 任何想法为什么?我认为这可能是一个内存问题 - 即文件超出可用RAM - 但是大幅减小DataFrame的大小并没有阻止EOF错误.
玩具代码和错误如下.
#set spark context
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
#read in 500mb csv as DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('myfile.csv')
#get dataframe into machine learning format
r_formula = RFormula(formula = "outcome ~ .")
mldf = r_formula.fit(df).transform(df)
#fit random forest model
rf = RandomForestClassifier(numTrees = 3, maxDepth = 2)
model = rf.fit(mldf)
result = model.transform(mldf).head()
Run Code Online (Sandbox Code Playgroud)
spark-submit在单个节点上运行上述代码会重复抛出以下错误,即使在拟合模型之前减小了DataFrame的大小(例如tinydf = df.sample(False, 0.00001):
Traceback (most recent call last):
File …Run Code Online (Sandbox Code Playgroud) 我有这个在pandas数据帧中本地运行的python代码:
df_result = pd.DataFrame(df
.groupby('A')
.apply(lambda x: myFunction(zip(x.B, x.C), x.name))
Run Code Online (Sandbox Code Playgroud)
我想在PySpark中运行它,但在处理pyspark.sql.group.GroupedData对象时遇到问题.
我尝试过以下方法:
sparkDF
.groupby('A')
.agg(myFunction(zip('B', 'C'), 'A'))
Run Code Online (Sandbox Code Playgroud)
返回
KeyError: 'A'
Run Code Online (Sandbox Code Playgroud)
我推测因为'A'不再是一列而我找不到x.name的等价物.
然后
sparkDF
.groupby('A')
.map(lambda row: Row(myFunction(zip('B', 'C'), 'A')))
.toDF()
Run Code Online (Sandbox Code Playgroud)
但是得到以下错误:
AttributeError: 'GroupedData' object has no attribute 'map'
Run Code Online (Sandbox Code Playgroud)
任何建议将非常感谢!
python user-defined-functions apache-spark apache-spark-sql pyspark
import numpy as np
df = spark.createDataFrame(
[(1, 1, None), (1, 2, float(5)), (1, 3, np.nan), (1, 4, None), (1, 5, float(10)), (1, 6, float('nan')), (1, 6, float('nan'))],
('session', "timestamp1", "id2"))
Run Code Online (Sandbox Code Playgroud)
预期产出
每列的数量为nan/null的数据帧
注意: 我在堆栈溢出中发现的先前问题仅检查null而不是nan.这就是为什么我创造了一个新问题.
我知道我可以在spark中使用isnull()函数来查找Spark列中的Null值的数量但是如何在Spark数据帧中找到Nan值?
我有以下示例DataFrame:
a | b | c |
1 | 2 | 4 |
0 | null | null|
null | 3 | 4 |
Run Code Online (Sandbox Code Playgroud)
我想只在前2列中替换空值 - 列"a"和"b":
a | b | c |
1 | 2 | 4 |
0 | 0 | null|
0 | 3 | 4 |
Run Code Online (Sandbox Code Playgroud)
以下是创建示例数据帧的代码:
rdd = sc.parallelize([(1,2,4), (0,None,None), (None,3,4)])
df2 = sqlContext.createDataFrame(rdd, ["a", "b", "c"])
Run Code Online (Sandbox Code Playgroud)
我知道如何使用以下方法替换所有空值:
df2 = df2.fillna(0)
Run Code Online (Sandbox Code Playgroud)
当我尝试这个时,我失去了第三列:
df2 = df2.select(df2.columns[0:1]).fillna(0)
Run Code Online (Sandbox Code Playgroud) 使用OCR工具我从截图中提取文本(每个约1-5个句子).但是,在手动验证提取的文本时,我注意到有时会出现几个错误.
鉴于文本"你好!我真的喜欢Spark❤️!",我注意到:
1)像"I","!"和"l"这样的字母被"|"代替.
2)Emojis未被正确提取并被其他字符替换或被遗漏.
3)不时删除空格.
结果,我可能会得到一个像这样的字符串:"你好7l |真实|喜欢Spark!"
因为我试图将这些字符串与包含正确文本的数据集相匹配(在这种情况下"Hello there!我真的很喜欢Spark❤️!"),我正在寻找一种有效的方法来匹配Spark中的字符串.
任何人都可以建议一个有效的Spark算法,它允许我比较提取文本(〜100.000)与我的数据集(约1亿)?
pyspark ×10
apache-spark ×9
python ×4
pyspark-sql ×2
dataframe ×1
fuzzy-search ×1
null ×1
rdd ×1
sql ×1