小编pis*_*all的帖子

Spark Ml评估方法

我有一个火花数据框如下:

predictions.show(5)
+------+----+------+-----------+
|  user|item|rating| prediction|
+------+----+------+-----------+
|379433|  31|     1| 0.08203495|
|  1834|  31|     1|  0.4854447|
|422635|  31|     1|0.017672742|
|   839|  31|     1| 0.39273006|
| 51444|  31|     1| 0.09795039|
+------+----+------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

预测是预测的评级,评级是隐含评级(计数).

现在我想检查我的推荐算法的AUC.

我首先尝试了pyspark.ml.BinaryClassificationEvaluator,因为它直接在数据框上工作.

# getting the evaluationa metric 

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print evaluator.evaluate(predictions)
Run Code Online (Sandbox Code Playgroud)

这给了我以下错误:

---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-65-c642ea9c2cf5> in <module>()
      4 
      5 evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
----> 6 print evaluator.evaluate(predictions)
      7 
      8 #print evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})

/Users/i854319/spark/python/pyspark/ml/evaluation.py in …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark apache-spark-ml apache-spark-mllib

9
推荐指数
1
解决办法
2321
查看次数

Pyspark - 加载训练有素的模型word2vec

我想用word2vec和PySpark来处理一些数据.我以前在Python中使用Google训练模型GoogleNews-vectors-negative300.bin和gensim.

有没有办法用Mllib/word2vec加载这个bin文件?或者将数据作为字典从Python {word:[vector]}(或.csv文件)导出然后将其加载到PySpark中是否有意义?

谢谢

python load gensim word2vec pyspark

7
推荐指数
1
解决办法
565
查看次数

无法使用pyspark从json dstream创建数据框

我正在尝试从dstream中的json创建一个数据框,但是下面的代码似乎无法正确显示该数据框-

import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
    return globals()['sqlContextSingletonInstance']

if __name__ == "__main__":
    if len(sys.argv) != 3:
        raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")

# Initialize a SparkContext with a name
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
# Create a StreamingContext with a batch interval of 2 seconds
stc = StreamingContext(spc, 2)
# Checkpointing feature
stc.checkpoint("checkpoint")
# Creating …
Run Code Online (Sandbox Code Playgroud)

python json apache-spark dstream pyspark

5
推荐指数
0
解决办法
901
查看次数

使用Boosting树在sklearn中生成特征

我指的是使用树集合进行特征转换的链接.

具体地,对于代码下面部分,该链接的样品中,(1)使用推进树来产生特征的方法,然后使用LR,性能优于(2)使用推进树本身来训练.问题,

  1. 想知道在一般情况下使用Boosting树生成特征(并使用另一个分类器进行分类)是否真的比使用Boosting树进行分类更好?
  2. 并且还想知道为什么使用Boosting树生成特征,然后使用LR训练,优于使用Boosting树本身?

    grd = GradientBoostingClassifier(n_estimators=n_estimator)
    grd_enc = OneHotEncoder()
    grd_lm = LogisticRegression()
    grd.fit(X_train, y_train)
    grd_enc.fit(grd.apply(X_train)[:, :, 0])
    grd_lm.fit(grd_enc.transform(grd.apply(X_train_lr)[:, :, 0]), y_train_lr)
    
    Run Code Online (Sandbox Code Playgroud)

python machine-learning scikit-learn

5
推荐指数
1
解决办法
154
查看次数

Kafka结构化流KafkaSourceProvider无法实例化

我正在一个流项目中,我有一个ping统计信息的kafka流,如下所示:

64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=1 ttl=62 time=0.913 ms
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=2 ttl=62 time=0.936 ms
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=3 ttl=62 time=0.980 ms
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=4 ttl=62 time=0.889 ms
Run Code Online (Sandbox Code Playgroud)

我正在尝试将其作为结构化流阅读pyspark。我使用以下命令启动pyspark:

 pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
Run Code Online (Sandbox Code Playgroud)

Pyspark版本是2.4,python版本是2.7(也尝试过3.6)

发送此段代码后,我会收到一条错误消息(接在《结构化流+ Kafka集成指南》之后):

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.18.2.21:2181").option("subscribe", "ping-stats").load()
Run Code Online (Sandbox Code Playgroud)

我遇到以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o37.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
        at java.util.ServiceLoader.fail(ServiceLoader.java:232)
        at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
        at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
        at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
        at java.util.ServiceLoader$1.next(ServiceLoader.java:480) …
Run Code Online (Sandbox Code Playgroud)

python java apache-kafka apache-spark pyspark

4
推荐指数
1
解决办法
2258
查看次数

根据另一列的元素从 pyspark 数组中删除元素

我想验证数组是否包含 Pyspark 中的字符串(Spark < 2.4)。

示例数据框:

column_1 <Array>           |    column_2 <String>
--------------------------------------------
["2345","98756","8794"]    |       8794
--------------------------------------------
["8756","45678","987563"]  |       1234
--------------------------------------------
["3475","8956","45678"]    |       3475
--------------------------------------------
Run Code Online (Sandbox Code Playgroud)

我想比较两列column_1和column_2。如果column_1包含column_2,我应该从column_1中跳过它的值。我做了一个 udf 从column_1 中提取column_2,但不起作用:

def contains(x, y):
        try:
            sx, sy = set(x), set(y)
            if len(sx) == 0:
                return sx
            elif len(sy) == 0:
                return sx
            else:
                return sx - sy            
        # in exception, for example `x` or `y` is None (not a list)
        except:
            return sx
    udf_contains = udf(contains, 'string')
    new_df = my_df.withColumn('column_1', udf_contains(my_df.column_1, my_df.column_2))  
Run Code Online (Sandbox Code Playgroud)

预期结果: …

apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
5101
查看次数

如何使用分隔符连接 PySpark 中的多列?

我有一个pyspark Dataframe,我想加入 3 列。

id |  column_1   | column_2    | column_3
--------------------------------------------
1  |     12      |   34        |    67
--------------------------------------------
2  |     45      |   78        |    90
--------------------------------------------
3  |     23      |   93        |    56
--------------------------------------------
Run Code Online (Sandbox Code Playgroud)

我想加入 3 列:column_1, column_2, column_3仅在其中添加一个值"-"

期待结果:

id |  column_1   | column_2    | column_3    |   column_join
-------------------------------------------------------------
1  |     12      |     34      |     67      |   12-34-67
-------------------------------------------------------------
2  |     45      |     78      |     90      |   45-78-90
-------------------------------------------------------------
3  |     23 …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
3343
查看次数

固定长度队列,当在末尾添加元素时删除第一个元素(先进先出)

Python中是否有这样一个队列的实现,它具有固定长度,当满时,弹出最左边的元素,同时在右边追加一个元素?

假设q = Queue([1,2,3,4,5])我的队列的最大长度为 5,我说q.append(6),那么预期输出print(q)应该是Queue([2,3,4,5,6])

这个问题也可以链接到:是否有一个固定大小的队列可以删除过多的元素?

python queue

4
推荐指数
1
解决办法
1699
查看次数

Pyspark的加权移动平均线

我在Pyspark写了一个时间序列的异常检测算法.我想计算(-3,3)或(-4,4)窗口的加权移动平均值.现在我使用滞后和超越窗口函数并将它们乘以一组权重.我的窗口目前是(-2,2).

我想知道是否有另一种计算Pyspark加权移动平均线的方法.

我目前使用的代码是:

data_frame_1 = spark_data_frame.withColumn("weighted_score_predicted", (weights[0] * lag(column_metric, 1).over(w) + weights[1] * lag(column_metric, 2).over(w) + weights[2] * lead(column_metric, 1).over(w) + weights[3] * lead(column_metric, 2).over(w)) / 2).na.drop()
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

2
推荐指数
1
解决办法
1581
查看次数

ShinyManager 身份验证屏幕不会超时

很抱歉再次问这个问题,但我真的需要解决这个问题(即将达到我在shinyapps.io 上的最大数据限制)。这是我上一个问题的链接 上一个堆栈问题 这是我的演示应用程序的链接。演示应用程序托管在 ShinyApps.io您会注意到该应用程序不会超时。例如,这是我今天这个应用程序的日志。在此输入图像描述

我已经尝试了上一个问题中向我推荐的所有内容以及timeOutshinymanager::secure_server()函数中包含参数。

问题似乎在于,shinyapps.io 在 UI 上设置了一个不活动计时器。一旦 UI 处于非活动状态,它就会启动进程超时R。然而,在我们的例子中,UI 在身份验证之前不会启动。这意味着我们的服务器继续运行。

像设置超时 ( setTimeout()) 这样的东西将是一个很好的选择。例如,如果用户在 5 分钟内未进行身份验证,则超时。我最初尝试了 while 循环,但结果并没有按计划进行。

我正在寻找一种在没有活动的情况下使服务器超时的方法。这是我的代码的一个玩具示例。最后,这是 github 上的shinymanager包的链接。闪亮经理

乌鲁木齐

ui <- dashboardPage(
   #My UI page and functions
 )
shinymanager::secure_app(ui)
Run Code Online (Sandbox Code Playgroud)

服务器R

function(input, output, session){
 auth = secure_server(check_credentials = check_credentials(df)) #df is my client database

 observeEvent(auth$user,{
    #server functions. This only gets run once the user authenticates
  }

}
Run Code Online (Sandbox Code Playgroud)

r shiny shinydashboard shinyjs

2
推荐指数
1
解决办法
1098
查看次数