我有一个结果RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions).这有以这种格式输出:
[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
Run Code Online (Sandbox Code Playgroud)
我想要的是创建一个CSV文件,其中一列labels(上面输出中的元组的第一部分)和一列predictions(元组输出的第二部分).但我不知道如何使用Python在Spark中写入CSV文件.
如何使用上述输出创建CSV文件?
我需要在下面一行中得到的数据框,在groupBy之后的max('diff')列中有一个别名"maxDiff".但是,下面的行不做任何改变,也不会抛出错误.
grpdf = joined_df.groupBy(temp1.datestamp).max('diff').alias("maxDiff")
Run Code Online (Sandbox Code Playgroud) 我只是得到了Spark的悬念,我有需要映射到的函数rdd,但是使用了一个全局字典:
from pyspark import SparkContext
sc = SparkContext('local[*]', 'pyspark')
my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified
my_list = ["a", "d", "c", "b"]
def my_func(letter):
return my_dict[letter]
my_list_rdd = sc.parallelize(my_list)
result = my_list_rdd.map(lambda x: my_func(x)).collect()
print result
Run Code Online (Sandbox Code Playgroud)
以上给出了预期的结果; 但是,我真的不确定我对全局变量的使用my_dict.似乎每个分区都会创建一个字典副本.它只是感觉不对..
看起来广播是我正在寻找的.但是,当我尝试使用它时:
my_dict_bc = sc.broadcast(my_dict)
def my_func(letter):
return my_dict_bc[letter]
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
TypeError: 'Broadcast' object has no attribute '__getitem__
Run Code Online (Sandbox Code Playgroud)
这似乎意味着我不能播放字典.
我的问题:如果我有一个使用全局字典的函数,需要将其映射到rdd,那么正确的方法是什么?
我的例子很简单,但在现实中my_dict,并my_list要大得多,而且my_func …
我试图在PySpark中使用以下选项覆盖Spark数据帧,但我没有成功
spark_df.write.format('com.databricks.spark.csv').option("header", "true",mode='overwrite').save(self.output_file_path)
Run Code Online (Sandbox Code Playgroud)
mode = overwrite命令不成功
我正在修补PySpark文档中的一些交叉验证代码,并尝试让PySpark告诉我选择了哪个模型:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.linalg import Vectors
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
dataset = sqlContext.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01, 0.001, 0.0001]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(dataset)
Run Code Online (Sandbox Code Playgroud)
在PySpark shell中运行它,我可以得到线性回归模型的系数,但我似乎无法找到lr.regParam交叉验证程序选择的值.有任何想法吗?
In [3]: cvModel.bestModel.coefficients
Out[3]: DenseVector([3.1573])
In [4]: cvModel.bestModel.explainParams()
Out[4]: ''
In [5]: cvModel.bestModel.extractParamMap()
Out[5]: {}
In [15]: cvModel.params
Out[15]: [] …Run Code Online (Sandbox Code Playgroud) modeling cross-validation pyspark apache-spark-ml apache-spark-mllib
在pandas数据框中,我使用以下代码绘制列的直方图:
my_df.hist(column = 'field_1')
Run Code Online (Sandbox Code Playgroud)
在pyspark数据框架中是否有可以实现相同目标的东西?(我在Jupyter笔记本中)谢谢!
当您连接具有相似列名称的两个DF时:
df = df1.join(df2, df1['id'] == df2['id'])
Run Code Online (Sandbox Code Playgroud)
加入工作正常,但你不能调用id列,因为它是不明确的,你会得到以下异常:
pyspark.sql.utils.AnalysisException: "Reference 'id' is ambiguous, could be: id#5691, id#5918.;"
Run Code Online (Sandbox Code Playgroud)
这使得id不再可用......
以下函数解决了该问题:
def join(df1, df2, cond, how='left'):
df = df1.join(df2, cond, how=how)
repeated_columns = [c for c in df1.columns if c in df2.columns]
for col in repeated_columns:
df = df.drop(df2[col])
return df
Run Code Online (Sandbox Code Playgroud)
我不喜欢它的是我必须迭代列名称并删除它们为什么由一个.这看起来很笨重......
您是否知道任何其他解决方案将更优雅地加入和删除重复项或删除多个列而不迭代它们?
我正在尝试根据某些数据手动创建一个 pyspark 数据框:
row_in=[(1566429545575348),(40.353977),(-111.701859)]
rdd=sc.parallelize(row_in)
schema = StructType([StructField("time_epocs", DecimalType(), True),StructField("lat", DecimalType(),True),StructField("long", DecimalType(),True)])
df_in_test=spark.createDataFrame(rdd,schema)
Run Code Online (Sandbox Code Playgroud)
当我尝试显示数据框时,这会出错,因此我不确定如何执行此操作。
但是,Spark 文档在这里对我来说似乎有点复杂,当我尝试按照这些说明进行操作时,我遇到了类似的错误。
有谁知道如何做到这一点?
在 PYCHARM 上运行 pyspark 程序时出现以下错误,错误:
java.io.IOException: 无法运行程序“python3”: CreateProcess error=2, 系统找不到指定的文件......
解释器正在识别 python.exe 文件,并且我已在项目结构中添加了内容根目录。
我之前在 Windows 命令提示符下运行相同的程序时遇到了类似的问题,并使用What is the right way to edit spark-env.sh before running Spark-shell?解决了它。