我正在尝试使用SKLearn来运行SVM模型.我现在只是尝试一些样本数据.这是数据和代码:
import numpy as np
from sklearn import svm
import random as random
A = np.array([[random.randint(0, 20) for i in range(2)] for i in range(10)])
lab = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
clf = svm.SVC(kernel='linear', C=1.0)
clf.fit(A, lab)
Run Code Online (Sandbox Code Playgroud)
仅供参考,我跑的时候
import sklearn
sklearn.__version__
Run Code Online (Sandbox Code Playgroud)
它输出0.17.
现在,当我跑步时print(clf.predict([1, 1])),我收到以下警告:
C:\Users\me\AppData\Local\Continuum\Anaconda2\lib\site-packages\sklearn\ut
ils\validation.py:386: DeprecationWarning: Passing 1d arrays as data is deprecat
ed in 0.17 and willraise ValueError in 0.19. Reshape your data either using X.re
shape(-1, 1) if your …Run Code Online (Sandbox Code Playgroud) 我正在使用RStudio在远程服务器上工作.此服务器无法访问Internet.我想安装包"stringi".我看过这个s tackoverflow文章,但每当我使用该命令时
install.packages("stringi_0.5-5.tar.gz",
configure.vars="ICUDT_DIR=/my/directory/for/icudt.zip")
Run Code Online (Sandbox Code Playgroud)
它只是试图访问互联网,这是它无法做到的.到目前为止,我一直在使用工具 - >安装包 - >从打包存档文件安装.但是,由于此错误,我无法再使用此方法.
我该如何安装这个包?
我想迭代一个字符串的RDD并为每个字符串"做一些事情".输出应该是double[][].这是一个带有for循环的例子.我知道我需要使用(我认为)foreachJava RDD 的功能.但是,我不知道如何理解语法.文档不是特别有用.我没有Java 8.
这是一个例子,如果我可以使用常规for循环,我想做什么.
public class PCA {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);
// here is the "type" of code I would like to execute
// 30 because I have 30 variables
double[][] vals = new double[data.count()][30];
double[] temp;
for (int i = 0; i < data.count(); i++) {
temp = splitStringtoDoubles(data[i]);
vals[i] = temp;
} …Run Code Online (Sandbox Code Playgroud) 我正在尝试运行Spark工作.这是我的shell脚本,位于/home/full/path/to/file/shell/my_shell_script.sh:
confLocation=../conf/my_config_file.conf &&
executors=8 &&
memory=2G &&
entry_function=my_function_in_python &&
dos2unix $confLocation &&
spark-submit \
--master yarn-client \
--num-executors $executors \
--executor-memory $memory \
--py-files /home/full/path/to/file/python/my_python_file.py $entry_function $confLocation
Run Code Online (Sandbox Code Playgroud)
当我运行它时,我收到一条错误消息:
错误:无法从JAR文件加载主类:/ home/full/path/to/file/shell/my_function_in_python
我的印象是它在错误的位置(python文件位于python目录,而不是shell目录).
我试图在样本数据上构建Logistic回归模型.
我们可以得到的模型输出是用于构建模型的特征的权重.
我找不到Spark API用于估计的标准误差,Wald-Chi Square统计量,p值等.
我将以下代码粘贴为例子
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.RandomForest
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
val sc = new SparkContext(new SparkConf().setAppName("SparkTest").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
val data: RDD[String] = sc.textFile("C:/Users/user/Documents/spark-1.5.1-bin-hadoop2.4/data/mllib/credit_approval_2_attr.csv")
val parsedData = data.map { line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
}
//Splitting the data
val splits: Array[RDD[LabeledPoint]] = parsedData.randomSplit(Array(0.7, 0.3), seed = 11L)
val training: RDD[LabeledPoint] = splits(0).cache()
val test: RDD[LabeledPoint] = splits(1)
// Run training algorithm to build …Run Code Online (Sandbox Code Playgroud) standard-error logistic-regression pyspark apache-spark-mllib
以下是训练DNNRegressor时用于监视事件的代码段。我正在从Jupyter笔记本上运行。
在培训过程中,终端出现以下错误:
tensorflow / core / util / events_writer.cc:162]事件文件/Users/eran/Genie/PNP/TB/events.out.tfevents.1473067505.Eran已消失。E tensorflow / core / util / events_writer.cc:131]无法将2498个事件刷新到/Users/eran/Genie/PNP/TB/events.out.tfevents.1473067505.Eran
def add_monitors():
validation_metrics = {'MeanSquaredError': tf.contrib.metrics.streaming_mean_squared_error}
monitors = learn.monitors.ValidationMonitor(valid_X, valid_y, every_n_steps=50, metrics=validation_metrics)
return [monitors]
regressor = learn.DNNRegressor(model_dir='/Users/eran/Genie/PNP/TB',
hidden_units=[32,16], feature_columns=learn.infer_real_valued_columns_from_input(X),
optimizer=tf.train.ProximalAdagradOptimizer(learning_rate=0.1),
config=learn.RunConfig(save_checkpoints_secs=1))
monitors = add_monitors()
regressor.fit(X, y, steps=10000, batch_size=20, monitors=monitors)
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?打开TensorBoard时我看不到任何事件记录
因此,我一直在 Python 中使用 Fabric 包来运行各种 HDFS 任务的 shell 脚本。
但是,每当我运行任务来检查 HDFS 中是否已存在文件/目录时,它都会退出 shell。这是一个示例(我使用的是 Python 3.5.2 和 Fabric3==1.12.post1)
from fabric.api import local
local('hadoop fs -stat hdfs://some/nonexistent/hdfs/dir/')
Run Code Online (Sandbox Code Playgroud)
如果目录不存在,此代码将产生
[localhost] 本地:hadoop fs -stat hdfs://some/nonexistent/hdfs/dir/ stat: `hdfs://some/nonexistent/hdfs/dir/': 没有这样的文件或目录
致命错误:local() 在执行“hadoop fs -stat hdfs://some/nonexistent/hdfs/dir/”时遇到错误(返回代码 1)
正在流产。
我也尝试过local('hadoop fs -test -e hdfs://some/nonexistent/hdfs/dir/'),但它引起了同样的问题。
如何使用 Fabric 生成一个布尔变量来告诉我 hdfs 中是否存在目录或文件?
我正在通过AWS使用Yarn EMR集群。我正在尝试为我的代码库编写一些测试,以确保我具有有关Yarn连接的测试范围。
现在,我故意弄乱变量使连接失败YARN_CONF_DIR。如果正确设置了变量,则可以毫无问题地连接到Yarn并运行我的作业。但是,如果YARN_CONF_DIR设置时,但未设置为具有预期的xml,cfg和bash文件的路径,则会得到以下输出:
INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
INFO Client: Retrying connect to server: 0.0.0.0/0.0.0.0:8032. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
INFO Client: Retrying connect to …Run Code Online (Sandbox Code Playgroud) 我有一些代码执行了很多步骤,我知道整个过程需要多长时间.但是,我希望能够计算每次转换所需的时间.以下是一些简单的步骤示例:
rdd1 = sc.textFile("my/filepath/*").map(lambda x: x.split(","))
rdd2 = sc.textFile("other/filepath/*").map(lambda x: x.split(","))
to_kv1 = rdd1.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric
to_kv2 = rdd2.map(lambda x: (x[0], x[1])) # x[0] is key, x[1] is numeric
reduced1 = to_kv1.reduceByKey(lambda a, b: a+b)
reduced2 = to_kv1.reduceByKey(lambda a, b: a+b)
outer_joined = reduced1.fullOuterJoin(reduced2) # let's just assume there is key overlap
outer_joined.saveAsTextFile("my_output")
Run Code Online (Sandbox Code Playgroud)
现在:我如何对此代码的特定部分进行基准测试?我知道端到端运行它会花费一定的时间(saveAsTextFile会强制它执行),但是我如何仅对代码reduceByKey或fullOuterJoin部分代码进行基准测试?我知道我可以count()在每次操作之后运行以强制执行,但这不能正确地对操作进行基准测试,因为它增加了count执行转换所需的时间以及执行转换的时间.
鉴于其懒惰的执行风格,对Spark变换进行基准测试的最佳方法是什么?
请注意,我不是在问如何衡量时间.我知道time模块,start = time.time()等等.我问如何基于Spark转换的延迟执行风格进行基准测试,直到您调用需要将信息返回给驱动程序的操作时才执行.
我有这样的数据集:
a = sc.parallelize([[1,2,3],[0,2,1],[9,8,7]]).toDF(["one", "two", "three"])
Run Code Online (Sandbox Code Playgroud)
我想要一个数据集添加一个新列,该列等于其他三列中的最大值.输出看起来像这样:
+----+----+-----+-------+
|one |two |three|max_col|
+----+----+-----+-------+
| 1| 2| 3| 3|
| 0| 2| 1| 2|
| 9| 8| 7| 9|
+----+----+-----+-------+
Run Code Online (Sandbox Code Playgroud)
我以为我会用withColumn,就像这样:
b = a.withColumn("max_col", max(a["one"], a["two"], a["three"]))
Run Code Online (Sandbox Code Playgroud)
但这会产生错误
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/spark152/python/pyspark/sql/column.py", line 418, in __nonzero__
raise ValueError("Cannot convert column into bool: please use '&' for 'and', '|' for 'or', "
ValueError: Cannot convert column into bool: please use '&' for …Run Code Online (Sandbox Code Playgroud) 我有两个 spark RDD,A 有 301,500,000 行,B 有 1,500,000 行。B 中的那 150 万行也都出现在 A 中。我想要这两个 RDD 之间的设置差异,以便我返回包含 300,000,000 行的 A,而来自 B 的那 1,500,000 行不再存在于 A 中。
我不能使用 Spark 数据帧。
这是我现在使用的系统。这些 RDD 有主键。我在下面做的是创建一个(收集的)出现在 B 中的主键列表,然后遍历 A 的主键以找到那些没有出现在 B 主键列表中的主键。
a = sc.parallelize([[0,"foo",'a'],[1,'bar','b'],[2,'mix','c'],[3,'hem', 'd'],[4,'line','e']])
b = sc.parallelize([[1,'bar','b'],[2,'mix','c']])
b_primary_keys = b.map(lambda x: x[0]).collect() # since first col = primary key
def sep_a_and_b(row):
primary_key = row[0]
if(primary_key not in b_primary_keys):
return(row)
a_minus_b = a.map(lambda x: sep_a_and_b(x)).filter(lambda x: x != None)
Run Code Online (Sandbox Code Playgroud)
现在,这适用于这个示例问题,因为 A 和 B 很小。但是,当我使用真实数据集 …
假设我想编写一个 python 装饰器来对一个函数进行计时,并让用户输入他们希望它运行的次数。我希望这个装饰器位于return返回的函数上,并且如果装饰函数使用yield 语句,我希望它返回一个生成器。
如果我执行以下操作:
import time
from datetime import datetime
import inspect
def time_it(iters=1):
def decorator(func):
def wrapper(*args, **kwargs):
is_gen = inspect.isgeneratorfunction(func)
start = datetime.now()
for _ in range(iters):
ret = yield from func(*args, **kwargs) if is_gen else func(*args, **kwargs)
elapsed = datetime.now() - start
print(f'Elapsed time: {elapsed} over {iters} iterations')
return ret
return wrapper
return decorator
Run Code Online (Sandbox Code Playgroud)
您将看到您装饰的任何函数现在都会返回一个生成器。
@time_it()
def one(ret):
time.sleep(1)
return ret
@time_it()
def two(ret):
time.sleep(1)
yield from ret
x = one(['a', 'b'])
y = …Run Code Online (Sandbox Code Playgroud) 所以,我知道Spark是一个懒惰的执行者.例如,如果我打电话
post = pre.filter(lambda x: some_condition(x)).map(lambda x: do_something(x))
我知道它不会立即执行.
但是当我打电话时,上面的代码会发生什么post.count()?我想象中的滤波将被迫执行,因为pre和post可能不会有相同的行数,因为有一个filter条件存在.但是,map是1对1的关系,因此计数不会受其影响.这个map命令会在这里执行count()吗?
跟进:当我想强制执行map语句时(假设count()不起作用),我可以调用哪些强制执行?我宁愿不必使用saveAsTextFile().
apache-spark ×7
python ×6
pyspark ×5
hadoop ×2
rdd ×2
benchmarking ×1
fabric ×1
generator ×1
hadoop-yarn ×1
java ×1
package ×1
r ×1
scikit-learn ×1
shell ×1
stringi ×1
svm ×1
tensorboard ×1
tensorflow ×1