我有一个火花数据框如下:
predictions.show(5)
+------+----+------+-----------+
| user|item|rating| prediction|
+------+----+------+-----------+
|379433| 31| 1| 0.08203495|
| 1834| 31| 1| 0.4854447|
|422635| 31| 1|0.017672742|
| 839| 31| 1| 0.39273006|
| 51444| 31| 1| 0.09795039|
+------+----+------+-----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
预测是预测的评级,评级是隐含评级(计数).
现在我想检查我的推荐算法的AUC.
我首先尝试了pyspark.ml.BinaryClassificationEvaluator,因为它直接在数据框上工作.
# getting the evaluationa metric
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
print evaluator.evaluate(predictions)
Run Code Online (Sandbox Code Playgroud)
这给了我以下错误:
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<ipython-input-65-c642ea9c2cf5> in <module>()
4
5 evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
----> 6 print evaluator.evaluate(predictions)
7
8 #print evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
/Users/i854319/spark/python/pyspark/ml/evaluation.py in …Run Code Online (Sandbox Code Playgroud) python apache-spark pyspark apache-spark-ml apache-spark-mllib
我想用word2vec和PySpark来处理一些数据.我以前在Python中使用Google训练模型GoogleNews-vectors-negative300.bin和gensim.
有没有办法用Mllib/word2vec加载这个bin文件?或者将数据作为字典从Python {word:[vector]}(或.csv文件)导出然后将其加载到PySpark中是否有意义?
谢谢
我正在尝试从dstream中的json创建一个数据框,但是下面的代码似乎无法正确显示该数据框-
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")
# Initialize a SparkContext with a name
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
# Create a StreamingContext with a batch interval of 2 seconds
stc = StreamingContext(spc, 2)
# Checkpointing feature
stc.checkpoint("checkpoint")
# Creating …Run Code Online (Sandbox Code Playgroud) 具体地,对于代码下面部分,该链接的样品中,(1)使用推进树来产生特征的方法,然后使用LR,性能优于(2)使用推进树本身来训练.问题,
并且还想知道为什么使用Boosting树生成特征,然后使用LR训练,优于使用Boosting树本身?
grd = GradientBoostingClassifier(n_estimators=n_estimator)
grd_enc = OneHotEncoder()
grd_lm = LogisticRegression()
grd.fit(X_train, y_train)
grd_enc.fit(grd.apply(X_train)[:, :, 0])
grd_lm.fit(grd_enc.transform(grd.apply(X_train_lr)[:, :, 0]), y_train_lr)
Run Code Online (Sandbox Code Playgroud)我正在一个流项目中,我有一个ping统计信息的kafka流,如下所示:
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=1 ttl=62 time=0.913 ms
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=2 ttl=62 time=0.936 ms
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=3 ttl=62 time=0.980 ms
64 bytes from vas.fractalanalytics.com (192.168.30.26): icmp_seq=4 ttl=62 time=0.889 ms
Run Code Online (Sandbox Code Playgroud)
我正在尝试将其作为结构化流阅读pyspark。我使用以下命令启动pyspark:
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0
Run Code Online (Sandbox Code Playgroud)
Pyspark版本是2.4,python版本是2.7(也尝试过3.6)
发送此段代码后,我会收到一条错误消息(接在《结构化流+ Kafka集成指南》之后):
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "172.18.2.21:2181").option("subscribe", "ping-stats").load()
Run Code Online (Sandbox Code Playgroud)
我遇到以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o37.load.
: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.kafka010.KafkaSourceProvider could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480) …Run Code Online (Sandbox Code Playgroud) 我想验证数组是否包含 Pyspark 中的字符串(Spark < 2.4)。
示例数据框:
column_1 <Array> | column_2 <String>
--------------------------------------------
["2345","98756","8794"] | 8794
--------------------------------------------
["8756","45678","987563"] | 1234
--------------------------------------------
["3475","8956","45678"] | 3475
--------------------------------------------
Run Code Online (Sandbox Code Playgroud)
我想比较两列column_1和column_2。如果column_1包含column_2,我应该从column_1中跳过它的值。我做了一个 udf 从column_1 中提取column_2,但不起作用:
def contains(x, y):
try:
sx, sy = set(x), set(y)
if len(sx) == 0:
return sx
elif len(sy) == 0:
return sx
else:
return sx - sy
# in exception, for example `x` or `y` is None (not a list)
except:
return sx
udf_contains = udf(contains, 'string')
new_df = my_df.withColumn('column_1', udf_contains(my_df.column_1, my_df.column_2))
Run Code Online (Sandbox Code Playgroud)
预期结果: …
我有一个pyspark Dataframe,我想加入 3 列。
id | column_1 | column_2 | column_3
--------------------------------------------
1 | 12 | 34 | 67
--------------------------------------------
2 | 45 | 78 | 90
--------------------------------------------
3 | 23 | 93 | 56
--------------------------------------------
Run Code Online (Sandbox Code Playgroud)
我想加入 3 列:column_1, column_2, column_3仅在其中添加一个值"-"
期待结果:
id | column_1 | column_2 | column_3 | column_join
-------------------------------------------------------------
1 | 12 | 34 | 67 | 12-34-67
-------------------------------------------------------------
2 | 45 | 78 | 90 | 45-78-90
-------------------------------------------------------------
3 | 23 …Run Code Online (Sandbox Code Playgroud) Python中是否有这样一个队列的实现,它具有固定长度,当满时,弹出最左边的元素,同时在右边追加一个元素?
假设q = Queue([1,2,3,4,5])我的队列的最大长度为 5,我说q.append(6),那么预期输出print(q)应该是Queue([2,3,4,5,6])
这个问题也可以链接到:是否有一个固定大小的队列可以删除过多的元素?
我在Pyspark写了一个时间序列的异常检测算法.我想计算(-3,3)或(-4,4)窗口的加权移动平均值.现在我使用滞后和超越窗口函数并将它们乘以一组权重.我的窗口目前是(-2,2).
我想知道是否有另一种计算Pyspark加权移动平均线的方法.
我目前使用的代码是:
data_frame_1 = spark_data_frame.withColumn("weighted_score_predicted", (weights[0] * lag(column_metric, 1).over(w) + weights[1] * lag(column_metric, 2).over(w) + weights[2] * lead(column_metric, 1).over(w) + weights[3] * lead(column_metric, 2).over(w)) / 2).na.drop()
Run Code Online (Sandbox Code Playgroud) 很抱歉再次问这个问题,但我真的需要解决这个问题(即将达到我在shinyapps.io 上的最大数据限制)。这是我上一个问题的链接 上一个堆栈问题
这是我的演示应用程序的链接。演示应用程序托管在 ShinyApps.io您会注意到该应用程序不会超时。例如,这是我今天这个应用程序的日志。
我已经尝试了上一个问题中向我推荐的所有内容以及timeOut在shinymanager::secure_server()函数中包含参数。
问题似乎在于,shinyapps.io 在 UI 上设置了一个不活动计时器。一旦 UI 处于非活动状态,它就会启动进程超时R。然而,在我们的例子中,UI 在身份验证之前不会启动。这意味着我们的服务器继续运行。
像设置超时 ( setTimeout()) 这样的东西将是一个很好的选择。例如,如果用户在 5 分钟内未进行身份验证,则超时。我最初尝试了 while 循环,但结果并没有按计划进行。
我正在寻找一种在没有活动的情况下使服务器超时的方法。这是我的代码的一个玩具示例。最后,这是 github 上的shinymanager包的链接。闪亮经理
乌鲁木齐
ui <- dashboardPage(
#My UI page and functions
)
shinymanager::secure_app(ui)
Run Code Online (Sandbox Code Playgroud)
服务器R
function(input, output, session){
auth = secure_server(check_credentials = check_credentials(df)) #df is my client database
observeEvent(auth$user,{
#server functions. This only gets run once the user authenticates
}
}
Run Code Online (Sandbox Code Playgroud) pyspark ×7
python ×7
apache-spark ×6
apache-kafka ×1
dstream ×1
gensim ×1
java ×1
json ×1
load ×1
queue ×1
r ×1
scikit-learn ×1
shiny ×1
shinyjs ×1
word2vec ×1