我正在尝试执行随机森林分类器并使用交叉验证评估模型。我与pySpark合作。输入的CSV文件将以Spark DataFrame格式加载。但是在构建模型时我遇到了一个问题。
下面是代码。
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
sc = SparkContext()
sqlContext = SQLContext(sc)
trainingData =(sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/PATH/CSVFile"))
numFolds = 10
rf = RandomForestClassifier(numTrees=100, maxDepth=5, maxBins=5, labelCol="V5409",featuresCol="features",seed=42)
evaluator = MulticlassClassificationEvaluator().setLabelCol("V5409").setPredictionCol("prediction").setMetricName("accuracy")
paramGrid = ParamGridBuilder().build()
pipeline = Pipeline(stages=[rf])
paramGrid=ParamGridBuilder().build()
crossval = CrossValidator(
estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=numFolds)
model = crossval.fit(trainingData)
print accuracy
Run Code Online (Sandbox Code Playgroud)
我低于错误
Traceback (most …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-ml
我有以下结构的数据框:
root
|-- index: long (nullable = true)
|-- text: string (nullable = true)
|-- topicDistribution: struct (nullable = true)
| |-- type: long (nullable = true)
| |-- values: array (nullable = true)
| | |-- element: double (containsNull = true)
|-- wiki_index: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
我需要将其更改为:
root
|-- index: long (nullable = true)
|-- text: string (nullable = true)
|-- topicDistribution: array (nullable = true)
| |-- element: double (containsNull = true)
|-- wiki_index: string (nullable = …Run Code Online (Sandbox Code Playgroud) 我需要在spark中使用Athena,但在使用JDBC驱动程序时spark使用prepareStatement,它给我一个异常“ com.amazonaws.athena.jdbc.NotImplementedException:方法Connection.prepareStatement尚未实现”
你能让我知道如何连接雅典娜吗
我有这样的地图结果
[('成功','',1),('成功','',1),('错误','something_random',1),('错误','something_random',1),('错误','something_random',1)]
是否有一种方法可以通过键减少最终结果:
[('成功',2),('错误',3)]
然后以某种方式在文件上打印所有错误?
我需要经常更改列位置.而不是更改代码我创建了一个临时数据帧Index_df.在这里,我将更新列位置,它应该反映更改应该执行的实际数据帧.
sample_df
F_cDc,F_NHY,F_XUI,F_NMY,P_cDc,P_NHY,P_XUI,P_NMY
415 258 854 245 478 278 874 235
405 197 234 456 567 188 108 267
315 458 054 375 898 978 677 134
Run Code Online (Sandbox Code Playgroud)
Index_df
col position
F_cDc,1
F_NHY,3
F_XUI,5
F_NMY,7
P_cDc,2
P_NHY,4
P_XUI,6
P_NMY,8
Run Code Online (Sandbox Code Playgroud)
在这里index_df,sample_df应该改变.
预期产量:
F_cDc,P_cDc,F_NHY,P_NHY,F_XUI,P_XUI,F_NMY,P_NMY
415 478 258 278 854 874 245 235
405 567 197 188 234 108 456 267
315 898 458 978 054 677 375 134
Run Code Online (Sandbox Code Playgroud)
这里的列位置根据我更新的位置而改变 Index_df
我能做到,sample_df.select("<column order>")但我有超过70列.从技术上讲,这不是最好的交易方式.
我有一个用户定义的函数如下所示,我想用它来导出我的数据帧中的新列:
def to_date_formatted(date_str, format):
if date_str == '' or date_str is None:
return None
try:
dt = datetime.datetime.strptime(date_str, format)
except:
return None
return dt.date()
spark.udf.register("to_date_udf", to_date_formatted, DateType())
Run Code Online (Sandbox Code Playgroud)
我可以通过运行sql来使用它select to_date_udf(my_date, '%d-%b-%y') as date.请注意将自定义格式作为参数传递给函数的功能
但是,我很难使用pyspark列表达式语法而不是sql来使用它
我想写一些类似的东西:
df.with_column("date", to_date_udf('my_date', %d-%b-%y')
Run Code Online (Sandbox Code Playgroud)
但这会导致错误.我怎样才能做到这一点?
[编辑:在此特定示例中,在Spark 2.2+中,您可以使用内置to_date函数提供可选的格式参数.我现在正在使用Spark 2.0,所以这对我来说是不可能的.另外值得注意的是我提供了这个例子,但我对提供UDF参数的一般语法感兴趣,而不是日期转换的细节]
我有三个数据帧,当我加入它时我收到错误.以下是3个数据帧:
名称:r_df第1栏:lab_key第2栏:第2帧
名称:f_df第1栏:lab_key第2栏:光学
名称:m_df第1栏:lab_key第2栏:res
所有三个数据帧都具有相同数量的行250,每个数据帧具有相同的lab_keys.
我的代码看起来像这样:
newDF = r_df.join(f_df, r_df.lab_key == f_df.lab_key).join(m_df, r_df.lab_key == m_df.lab_key).select('r_df.frame', 'f_df.optic', 'm_df.res')
Run Code Online (Sandbox Code Playgroud)
我收到一个错误:
Py4JJavaError:调用o902.join时发生错误.:org.apache.spark.sql.AnalysisException:引用'lab_key'不明确,可以是:lab_key#1648,lab_key#1954.;
对问题可能不是很有帮助.我试图获得一个包含以下列的数据框:
第1栏:lab_key第
2 栏:第
3 栏:第4栏:第4栏
:res
你能帮我加入这三个数据框吗?
我正在尝试从粘合pyspark脚本内的s3存储桶中检索JSON文件。
我在aws胶内的作业中运行此功能:
def run(spark):
s3_bucket_path = 's3://bucket/data/file.gz'
df = spark.read.json(s3_bucket_path)
df.show()
Run Code Online (Sandbox Code Playgroud)
在此之后,我得到:AnalysisException:u'路径不存在:s3://bucket/data/file.gz;'
我搜索了此问题,但没有发现任何类似的东西可以推断出问题出在哪里。我认为访问存储分区可能存在权限问题,但是错误消息应该有所不同。
我有一个pyspark 2.0数据框,我试图根据(相对)较短的列表进行过滤-长度可能为50-100。
filterList = ['A','B','C']
Run Code Online (Sandbox Code Playgroud)
我想将该列表广播到我的每个节点,并使用它删除列表中没有两列之一的记录。
此操作有效:
filter_df= df.where((df['Foo'].isin(filterList )) | (df['Bar'].isin(filterList)))
Run Code Online (Sandbox Code Playgroud)
但是当我广播列表时,我得到一个错误:
filterListB= sc.broadcast(filterList)
filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-99-1b972cf29148> in <module>()
----> 1 filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))
/usr/local/spark/python/pyspark/sql/column.pyc in isin(self, *cols)
284 if len(cols) == 1 and isinstance(cols[0], (list, set)):
285 cols = cols[0]
--> 286 cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
287 sc = SparkContext._active_spark_context
288 jc = getattr(self._jc, "isin")(_to_seq(sc, cols))
/usr/local/spark/python/pyspark/sql/column.pyc in _create_column_from_literal(literal)
33 …Run Code Online (Sandbox Code Playgroud) 我已经通过以下方式创建了一个DataFrame:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
df = spark.read.csv("train.csv", header=True)
Run Code Online (Sandbox Code Playgroud)
我的DataFrame的架构如下:
root
|-- PassengerId: string (nullable = true)
|-- Survived: string (nullable = true)
|-- Pclass: string (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: string (nullable = true)
|-- SibSp: string (nullable = true)
|-- Parch: string (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: …Run Code Online (Sandbox Code Playgroud) pyspark ×10
apache-spark ×6
python ×4
pyspark-sql ×2
amazon-s3 ×1
aws-glue ×1
broadcast ×1
json ×1
mapreduce ×1
tuples ×1