小编Abh*_*hek的帖子

如何使用spark-csv包在HDFS上只读取n行大型CSV文件?

我在HDFS上有一个很大的分布式文件,每次我使用带有spark-csv包的sqlContext时,它首先加载整个文件,这需要相当长的时间.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
Run Code Online (Sandbox Code Playgroud)

现在因为我只想做一些快速检查,所有我需要的是整个文件的少数/任意n行.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
Run Code Online (Sandbox Code Playgroud)

但所有这些都在文件加载完成后运行.我不能在读取文件本身时限制行数吗?我指的是spark-csv中n_rows等效的pandas,如:

pd_df = pandas.read_csv("file_path", nrows=20)
Run Code Online (Sandbox Code Playgroud)

或者可能是火花实际上没有加载文件,第一步,但在这种情况下,为什么我的文件加载步骤需要花费太多时间呢?

我想要

df.count()
Run Code Online (Sandbox Code Playgroud)

只给我n而不是所有的行,是否可能?

hdfs apache-spark apache-spark-sql pyspark spark-csv

12
推荐指数
2
解决办法
1万
查看次数

我们应该并行化DataFrame,就像我们在训练之前并行化Seq一样

考虑一下这里给出的代码,

https://spark.apache.org/docs/1.2.0/ml-guide.html

import org.apache.spark.ml.classification.LogisticRegression
val training = sparkContext.parallelize(Seq(
  LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
  LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
  LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))

val lr = new LogisticRegression()
lr.setMaxIter(10).setRegParam(0.01)

val model1 = lr.fit(training)
Run Code Online (Sandbox Code Playgroud)

假设我们使用sqlContext.read()将"training"作为数据框读取,我们是否应该做类似的事情

val model1 = lr.fit(sparkContext.parallelize(training)) // or some variation of this
Run Code Online (Sandbox Code Playgroud)

或者,在传递dataFrame时,fit函数将自动处理并行计算/数据

问候,

scala apache-spark apache-spark-sql pyspark apache-spark-ml

9
推荐指数
2
解决办法
2万
查看次数

使用scikit训练逻辑回归学习多类分类

根据scikit多类分类, Logistic回归可以通过在构造函数中设置multi_class = multinomial来用于多类分类.但这样做会给出错误:

码:

text_clf = Pipeline([('vect', TfidfVectorizer()),('clf', LogisticRegression(multi_class = 'multinomial')),])
text_clf = text_clf.fit(X_train, Y_train)
Run Code Online (Sandbox Code Playgroud)

错误:

ValueError:求解器liblinear不支持多项后端.

你能告诉我这里有什么问题吗?

注意:保持multi_class为空,即"ovr"工作正常,但它适合每个分类器的二进制模型,我也想尝试mutlinomial功能.

python classification scikit-learn

8
推荐指数
1
解决办法
7411
查看次数

Theano教程中的澄清

我正在阅读Theano文档主页上提供的本教程

我不确定梯度下降部分给出的代码.

在此输入图像描述

我对for循环有疑问.

如果将' param_update '变量初始化为零.

param_update = theano.shared(param.get_value()*0., broadcastable=param.broadcastable)
Run Code Online (Sandbox Code Playgroud)

然后在剩下的两行中更新其值.

updates.append((param, param - learning_rate*param_update))
updates.append((param_update, momentum*param_update + (1. - momentum)*T.grad(cost, param)))
Run Code Online (Sandbox Code Playgroud)

我们为什么需要它?

我想我在这里弄错了.你们能帮助我吗!

python numpy gradient-descent theano deep-learning

7
推荐指数
1
解决办法
3472
查看次数

如何将L1正则化精确地添加到张量流误差函数中

嘿,我是张力流的新手,即使经过很多努力都无法在误差项中添加L1正则化项

x = tf.placeholder("float", [None, n_input])
# Weights and biases to hidden layer
ae_Wh1 = tf.Variable(tf.random_uniform((n_input, n_hidden1), -1.0 / math.sqrt(n_input), 1.0 / math.sqrt(n_input)))
ae_bh1 = tf.Variable(tf.zeros([n_hidden1]))
ae_h1 = tf.nn.tanh(tf.matmul(x,ae_Wh1) + ae_bh1)

ae_Wh2 = tf.Variable(tf.random_uniform((n_hidden1, n_hidden2), -1.0 / math.sqrt(n_hidden1), 1.0 / math.sqrt(n_hidden1)))
ae_bh2 = tf.Variable(tf.zeros([n_hidden2]))
ae_h2 = tf.nn.tanh(tf.matmul(ae_h1,ae_Wh2) + ae_bh2)

ae_Wh3 = tf.transpose(ae_Wh2)
ae_bh3 = tf.Variable(tf.zeros([n_hidden1]))
ae_h1_O = tf.nn.tanh(tf.matmul(ae_h2,ae_Wh3) + ae_bh3)

ae_Wh4 = tf.transpose(ae_Wh1)
ae_bh4 = tf.Variable(tf.zeros([n_input]))
ae_y_pred = tf.nn.tanh(tf.matmul(ae_h1_O,ae_Wh4) + ae_bh4)



ae_y_actual = tf.placeholder("float", [None,n_input])
meansq = tf.reduce_mean(tf.square(ae_y_actual - ae_y_pred)) …
Run Code Online (Sandbox Code Playgroud)

python neural-network deep-learning tensorflow

7
推荐指数
1
解决办法
2万
查看次数

pyspark randomForest功能重要性:如何从列号中获取列名

我在spark中使用标准(字符串索引器+一个热编码器+ randomForest)管道,如下所示

labelIndexer = StringIndexer(inputCol = class_label_name, outputCol="indexedLabel").fit(data)

string_feature_indexers = [
   StringIndexer(inputCol=x, outputCol="int_{0}".format(x)).fit(data)
   for x in char_col_toUse_names
]

onehot_encoder = [
   OneHotEncoder(inputCol="int_"+x, outputCol="onehot_{0}".format(x))
   for x in char_col_toUse_names
]
all_columns = num_col_toUse_names + bool_col_toUse_names + ["onehot_"+x for x in char_col_toUse_names]
assembler = VectorAssembler(inputCols=[col for col in all_columns], outputCol="features")
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=100)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)
pipeline = Pipeline(stages=[labelIndexer] + string_feature_indexers + onehot_encoder + [assembler, rf, labelConverter])

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)
cvModel = crossval.fit(trainingData)
Run Code Online (Sandbox Code Playgroud)

现在,在拟合之后,我可以使用随机林和特征重要性cvModel.bestModel.stages[-2].featureImportances,但这不会给我功能/列名称,而只是功能号码. …

random-forest pyspark apache-spark-ml apache-spark-mllib

7
推荐指数
2
解决办法
3491
查看次数

在theano中'no_inplace'是什么意思?

这是代码:

>>> a = T.dscalar("a")
>>> b = a+2
>>> b
Run Code Online (Sandbox Code Playgroud)

而输出是

Elemwise{add,no_inplace}.0
Run Code Online (Sandbox Code Playgroud)

add表明apply节点具有add作为操作.

但是no_inplace意味着什么?为什么我们在输出结尾有一个".0"?

theano

5
推荐指数
1
解决办法
1655
查看次数

为什么NLTK库中有不同的Lemmatizers?

>> from nltk.stem import WordNetLemmatizer as lm1
>> from nltk import WordNetLemmatizer as lm2
>> from nltk.stem.wordnet import WordNetLemmatizer as lm3
Run Code Online (Sandbox Code Playgroud)

对我来说,这三个作品都是以同样的方式,但只是为了确认,它们是否提供了不同的东西?

python nlp nltk lemmatization

5
推荐指数
1
解决办法
487
查看次数

.std() 和 .skew() 使用 .rolling 给出错误答案

我正在使用 pandas 版本:'0.23.4'

在调试我的代码时,我意识到,std & skew 并没有通过滚动窗口给出正确的结果。检查下面的代码:

import pandas as pd
import numpy as np
import scipy.stats as sp

df = pd.DataFrame(np.random.randint(1,10,(5)))
df_w = df.rolling(window=3, min_periods=1)

m1 = df_w.apply(lambda x: np.mean(x))
m2 = df_w.mean()

s1 = df_w.apply(lambda x: np.std(x))
s2 = df_w.std()

sk1 = df_w.apply(lambda x: sp.skew(x))
sk2 = df_w.skew()
Run Code Online (Sandbox Code Playgroud)

虽然平均值的结果是相同的,但标准差和偏斜的结果却不同?这是预期的行为还是我错过了什么?

python numpy scipy pandas

4
推荐指数
1
解决办法
1338
查看次数

在著名的卷积神经网络示例中进行合并和二次采样后无法计算尺寸

Yann LeCun的感知和推理层次模型

上图来自Yann LeCun的pdf文件,标题为“感知和推理的层次模型”

我无法理解第2层是14X14要素地图的方式?带有10X10池化和5X5子采样的75X75矩阵如何提供14X14矩阵?

pooling object-recognition neural-network subsampling deep-learning

1
推荐指数
1
解决办法
2649
查看次数

将数据帧写入 HDFS 时出现 NumberFormatException 错误

我正在写信dataframeHDFS,使用以下代码

final_df.write.format("com.databricks.spark.csv").option("header", "true").save("path_to_hdfs")
Run Code Online (Sandbox Code Playgroud)

它给了我以下错误:

Caused by: java.lang.NumberFormatException: For input string: "124085346080"
Run Code Online (Sandbox Code Playgroud)

完整堆栈如下:

at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:86)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "124085346080"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:583)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
    at org.apache.spark.sql.execution.datasources.csv.CSVTypeCast$.castTo(CSVInferSchema.scala:241)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:116)
    at org.apache.spark.sql.execution.datasources.csv.CSVRelation$$anonfun$csvParser$3.apply(CSVRelation.scala:85)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:128)
    at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anonfun$buildReader$1$$anonfun$apply$2.apply(CSVFileFormat.scala:127)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) …
Run Code Online (Sandbox Code Playgroud)

hadoop scala hdfs apache-spark pyspark

0
推荐指数
1
解决办法
1640
查看次数