标签: pyspark

pyspark.sql.utils.IllegalArgumentException:u'字段“功能”不存在。

我正在尝试执行随机森林分类器并使用交叉验证评估模型。我与pySpark合作。输入的CSV文件将以Spark DataFrame格式加载。但是在构建模型时我遇到了一个问题。

下面是代码。

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
sc = SparkContext()
sqlContext = SQLContext(sc)
trainingData =(sqlContext.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .option("inferSchema", "true")
         .load("/PATH/CSVFile"))
numFolds = 10 
rf = RandomForestClassifier(numTrees=100, maxDepth=5, maxBins=5, labelCol="V5409",featuresCol="features",seed=42)
evaluator = MulticlassClassificationEvaluator().setLabelCol("V5409").setPredictionCol("prediction").setMetricName("accuracy")
paramGrid = ParamGridBuilder().build()

pipeline = Pipeline(stages=[rf])
paramGrid=ParamGridBuilder().build()
crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=numFolds)
model = crossval.fit(trainingData)
print accuracy
Run Code Online (Sandbox Code Playgroud)

我低于错误

Traceback (most …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-ml

0
推荐指数
1
解决办法
5985
查看次数

PySpark:DataFrame-将结构转换为数组

我有以下结构的数据框:

root
 |-- index: long (nullable = true)
 |-- text: string (nullable = true)
 |-- topicDistribution: struct (nullable = true)
 |    |-- type: long (nullable = true)
 |    |-- values: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- wiki_index: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

我需要将其更改为:

root
 |-- index: long (nullable = true)
 |-- text: string (nullable = true)
 |-- topicDistribution: array (nullable = true)
 |    |--  element: double (containsNull = true)
 |-- wiki_index: string (nullable = …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark spark-dataframe

0
推荐指数
1
解决办法
2729
查看次数

Spark Athena连接器

我需要在spark中使用Athena,但在使用JDBC驱动程序时spark使用prepareStatement,它给我一个异常“ com.amazonaws.athena.jdbc.NotImplementedException:方法Connection.prepareStatement尚未实现”

你能让我知道如何连接雅典娜吗

pyspark amazon-athena

0
推荐指数
2
解决办法
4683
查看次数

Spark(pyspark)如何仅在3元素元组的2个元素上reduceByKey

我有这样的地图结果

[('成功','',1),('成功','',1),('错误','something_random',1),('错误','something_random',1),('错误','something_random',1)]

是否有一种方法可以通过键减少最终结果:

[('成功',2),('错误',3)]

然后以某种方式在文件上打印所有错误?

tuples mapreduce apache-spark pyspark

0
推荐指数
1
解决办法
626
查看次数

Pyspak:根据其他数据帧动态更新数据帧的列位置

我需要经常更改列位置.而不是更改代码我创建了一个临时数据帧Index_df.在这里,我将更新列位置,它应该反映更改应该执行的实际数据帧.

sample_df

F_cDc,F_NHY,F_XUI,F_NMY,P_cDc,P_NHY,P_XUI,P_NMY
415    258   854   245   478   278   874   235
405    197   234   456   567   188   108   267
315    458   054   375   898   978   677   134
Run Code Online (Sandbox Code Playgroud)

Index_df

   col   position
    F_cDc,1 
    F_NHY,3
    F_XUI,5
    F_NMY,7
    P_cDc,2 
    P_NHY,4
    P_XUI,6
    P_NMY,8
Run Code Online (Sandbox Code Playgroud)

在这里index_df,sample_df应该改变.

预期产量:

F_cDc,P_cDc,F_NHY,P_NHY,F_XUI,P_XUI,F_NMY,P_NMY
415    478   258   278   854   874   245   235
405    567   197   188   234   108   456   267
315    898   458   978   054   677   375   134
Run Code Online (Sandbox Code Playgroud)

这里的列位置根据我更新的位置而改变 Index_df

我能做到,sample_df.select("<column order>")但我有超过70列.从技术上讲,这不是最好的交易方式.

python apache-spark pyspark pyspark-sql

0
推荐指数
1
解决办法
147
查看次数

Pyspark:使用带参数的UDF创建新列

我有一个用户定义的函数如下所示,我想用它来导出我的数据帧中的新列:

def to_date_formatted(date_str, format):
    if date_str == '' or date_str is None:
        return None
    try:
        dt = datetime.datetime.strptime(date_str, format)
    except:
        return None
    return dt.date()

spark.udf.register("to_date_udf", to_date_formatted, DateType())
Run Code Online (Sandbox Code Playgroud)

我可以通过运行sql来使用它select to_date_udf(my_date, '%d-%b-%y') as date.请注意将自定义格式作为参数传递给函数的功能

但是,我很难使用pyspark列表达式语法而不是sql来使用它

我想写一些类似的东西:

df.with_column("date", to_date_udf('my_date', %d-%b-%y')
Run Code Online (Sandbox Code Playgroud)

但这会导致错误.我怎样才能做到这一点?

[编辑:在此特定示例中,在Spark 2.2+中,您可以使用内置to_date函数提供可选的格式参数.我现在正在使用Spark 2.0,所以这对我来说是不可能的.另外值得注意的是我提供了这个例子,但我对提供UDF参数的一般语法感兴趣,而不是日期转换的细节]

apache-spark pyspark pyspark-sql

0
推荐指数
1
解决办法
6268
查看次数

加入3个数据帧时遇到麻烦 - pyspark

我有三个数据帧,当我加入它时我收到错误.以下是3个数据帧:

名称:r_df第1栏:lab_key第2栏:第2帧

名称:f_df第1栏:lab_key第2栏:光学

名称:m_df第1栏:lab_key第2栏:res

所有三个数据帧都具有相同数量的行250,每个数据帧具有相同的lab_keys.

我的代码看起来像这样:

newDF = r_df.join(f_df, r_df.lab_key == f_df.lab_key).join(m_df, r_df.lab_key == m_df.lab_key).select('r_df.frame', 'f_df.optic', 'm_df.res')
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:

Py4JJavaError:调用o902.join时发生错误.:org.apache.spark.sql.AnalysisException:引用'lab_key'不明确,可以是:lab_key#1648,lab_key#1954.;

对问题可能不是很有帮助.我试图获得一个包含以下列的数据框:

第1栏:lab_key第
2 栏:第
3 栏:第4栏:第4栏
:res

你能帮我加入这三个数据框吗?

python pyspark

0
推荐指数
1
解决办法
2832
查看次数

在AWS Glus pyspark作业中从s3加载JSON

我正在尝试从粘合pyspark脚本内的s3存储桶中检索JSON文件。

我在aws胶内的作业中运行此功能:

def run(spark):
    s3_bucket_path = 's3://bucket/data/file.gz'

    df = spark.read.json(s3_bucket_path)
    df.show()
Run Code Online (Sandbox Code Playgroud)

在此之后,我得到:AnalysisException:u'路径不存在:s3://bucket/data/file.gz;'

我搜索了此问题,但没有发现任何类似的东西可以推断出问题出在哪里。我认为访问存储分区可能存在权限问题,但是错误消息应该有所不同。

python json amazon-s3 pyspark aws-glue

0
推荐指数
1
解决办法
891
查看次数

基于广播变量的pyspark过滤器数据帧

我有一个pyspark 2.0数据框,我试图根据(相对)较短的列表进行过滤-长度可能为50-100。

filterList = ['A','B','C']
Run Code Online (Sandbox Code Playgroud)

我想将该列表广播到我的每个节点,并使用它删除列表中没有两列之一的记录。

此操作有效:

filter_df= df.where((df['Foo'].isin(filterList )) | (df['Bar'].isin(filterList)))
Run Code Online (Sandbox Code Playgroud)

但是当我广播列表时,我得到一个错误:

filterListB= sc.broadcast(filterList)

filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-99-1b972cf29148> in <module>()
----> 1 filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))

/usr/local/spark/python/pyspark/sql/column.pyc in isin(self, *cols)
    284         if len(cols) == 1 and isinstance(cols[0], (list, set)):
    285             cols = cols[0]
--> 286         cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
    287         sc = SparkContext._active_spark_context
    288         jc = getattr(self._jc, "isin")(_to_seq(sc, cols))

/usr/local/spark/python/pyspark/sql/column.pyc in _create_column_from_literal(literal)
     33 …
Run Code Online (Sandbox Code Playgroud)

python broadcast pyspark

0
推荐指数
1
解决办法
1136
查看次数

如何转换Spark DataFrame列?使用pyspark

我已经通过以下方式创建了一个DataFrame:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

df = spark.read.csv("train.csv", header=True)
Run Code Online (Sandbox Code Playgroud)

我的DataFrame的架构如下:

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

0
推荐指数
1
解决办法
1750
查看次数