小编Kit*_*ito的帖子

Spark使用上一行的值向数据框添加新列

我想知道如何在Spark(Pyspark)中实现以下功能

初始数据帧:

+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)

结果数据帧:

+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0|  7.0  |
+--+---+-------+
|3 |7.0|  3.0  |
+--+---+-------+
|2 |3.0|  5.0  |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)

我设法通过以下方式将新列"附加"到数据框中: df.withColumn("new_Col", df.num * 10)

但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.

任何帮助,将不胜感激.

python dataframe apache-spark apache-spark-sql pyspark

33
推荐指数
1
解决办法
2万
查看次数

如何避免一个Spark Streaming窗口阻止另一个窗口同时运行一些本机Python代码

我正在使用两个不同的窗口运行Spark Streaming(在窗口上用SKLearn训练模型,另一个用于根据该模型预测值)我想知道如何避免一个窗口("慢"训练窗口)到训练模型,而不"阻塞""快速"预测窗口.
我的简化代码如下:

conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)

stream = ssc.socketTextStream("localhost", 7000)


import Custom_ModelContainer

### Window 1 ###
### predict data based on model computed in window 2 ###

def predict(time, rdd):
    try:
       # ... rdd conversion to df, feature extraction etc...

       # regular python code 
       X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
       pred = Custom_ModelContainer.getmodel().predict(X)

       # send prediction to GUI

    except Exception, e: print e

predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)


### Window 2 ###
### fit …
Run Code Online (Sandbox Code Playgroud)

python scikit-learn apache-spark spark-streaming

10
推荐指数
1
解决办法
427
查看次数

如何在Keras中使用嵌入层用于递归神经网络(RNN)

我对神经网络和Keras库很新,我想知道如何使用这里描述的嵌入层来掩盖我的输入数据从2D张量到RNN的3D张量.

说我的时间序列数据如下(随着时间的增加):

X_train = [
   [1.0,2.0,3.0,4.0],
   [2.0,5.0,6.0,7.0],
   [3.0,8.0,9.0,10.0],
   [4.0,11.0,12.0,13.0],
   ...
] # with a length of 1000
Run Code Online (Sandbox Code Playgroud)

现在,假设我想给RNN最后2个特征向量,以便预测时间t + 1的特征向量.

目前(不埋层),我创造与形状(nb_samples,时间步长,input_dim)自己所需要的三维张量(如本例这里).

与我的例子相关,最终的3D Tensor看起来如下:

X_train_2 = [
  [[1.0,2.0,3.0,4.0],
   [2.0,5.0,6.0,7.0]],
  [[2.0,5.0,6.0,7.0],
   [3.0,8.0,9.0,10.0]],
  [[3.0,8.0,9.0,10.0],
   [4.0,11.0,12.0,13.0]],
  etc...
]
Run Code Online (Sandbox Code Playgroud)

和Y_train:

Y_train = [
   [3.0,8.0,9.0,10.0],
   [4.0,11.0,12.0,13.0],
   etc...
]
Run Code Online (Sandbox Code Playgroud)

我的模型如下(适用于上面的简化示例):

num_of_vectors = 2
vect_dimension = 4

model = Sequential()
model.add(SimpleRNN(hidden_neurons, return_sequences=False, input_shape=(num_of_vectors, vect_dimension))) 
model.add(Dense(vect_dimension))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
model.fit(X_train, Y_train, batch_size=50, nb_epoch=10, validation_split=0.15)
Run Code Online (Sandbox Code Playgroud)

最后,我的问题是,我怎么能避免那些2D张量到3D张量重塑我自己并使用嵌入层呢?我想在model = sequential()之后,我将不得不添加如下内容:

model.add(Embedding(?????))
Run Code Online (Sandbox Code Playgroud)

可能答案很简单,我只是对嵌入层的文档感到困惑.

embedding neural-network keras recurrent-neural-network

9
推荐指数
1
解决办法
8604
查看次数

Pyspark导入.py文件无法正常工作

我的目标是将自定义.py文件导入我的spark应用程序并调用该文件中包含的一些函数

这是我尝试过的:

我有一个名为Test.py的测试文件,如下所示:

def func():
    print "Import is working"
Run Code Online (Sandbox Code Playgroud)

在我的Spark应用程序中,我执行以下操作(如文档中所述):

sc = SparkContext(conf=conf, pyFiles=['/[AbsolutePathTo]/Test.py'])
Run Code Online (Sandbox Code Playgroud)

我也尝试了这个(在创建Spark上下文之后):

sc.addFile("/[AbsolutePathTo]/Test.py")
Run Code Online (Sandbox Code Playgroud)

在提交我的spark应用程序时,我甚至尝试过以下方法:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M2 --py-files /[AbsolutePath]/Test.py ../Main/Code/app.py
Run Code Online (Sandbox Code Playgroud)

但是,我总是得到一个名称错误:

NameError: name 'func' is not defined
Run Code Online (Sandbox Code Playgroud)

当我在app.py中调用func()时.(如果我尝试调用Test.func(),则与'Test'相同的错误)

最后,我还尝试使用与上面相同的命令在pyspark shell中导入文件:

sc.addFile("/[AbsolutePathTo]/Test.py")
Run Code Online (Sandbox Code Playgroud)

奇怪的是,我没有在导入时出错,但仍然,我不能在没有得到错误的情况下调用func().此外,不确定它是否重要,但我在一台机器上本地使用火花.

我真的尝试了我能想到的一切,但仍然无法让它发挥作用.可能我错过了一些非常简单的事情.任何帮助,将不胜感激.

python python-import apache-spark pyspark

8
推荐指数
1
解决办法
7562
查看次数

连接/集成Cassandra和Spark(pyspark)

我拼命想把Cassandra连接到pyspark,但是我无法让它工作.我对Spark和cassandra很新,所以我可能会错过一些相当简单的东西.

我对网上所有不同的解释感到有些困惑,但从我的理解,最简单的方法是使用"Spark包"?(http://spark-packages.org/package/TargetHolding/pyspark-cassandra)

因此,使用以下命令:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py
Run Code Online (Sandbox Code Playgroud)

如果我使用如上所述的spark包,我是否理解我不需要下载任何包?

在myPysparkFile.py中我尝试了以下两个版本,我都没有为我工作:

版本1,我从http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra的第14页获得:

"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark_cassandra import CassandraSparkContext,Row

conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")

sc = CassandraSparkContext(conf=conf)

rdd = sc.cassandraTable("test", "words")
Run Code Online (Sandbox Code Playgroud)

作为一个错误我得到:

ImportError: No module named pyspark_cassandra
Run Code Online (Sandbox Code Playgroud)

版本2(灵感来自:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):

"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

sqlContext.read\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="kv", keyspace="test")\
    .load().show()
Run Code Online (Sandbox Code Playgroud)

这给了我以下错误:

    py4j.protocol.Py4JJavaError: An …
Run Code Online (Sandbox Code Playgroud)

cassandra apache-spark pyspark

7
推荐指数
1
解决办法
4450
查看次数

代表没有回应

我试图在两个Viewcontrollers之间使用委托,但不幸的是我的代表没有被解雇.我希望有人可以帮我解决问题.我有一个名为MapBackgroundViewController的ViewContoller和一个名为MapsViewController的ViewContoller.如果MapsBackgroundViewController的SegmentedControl发生更改,则应通知MapsViewController.(我实际上尝试在iPhone上使用地面卷曲实现类似地图应用程序的东西)

这是我的代码的一部分:

MapBackgroundViewController.h

@protocol ChangeMapTyp <NSObject>
@required
- (void)segmentedControllChangedMapType:(MKMapType) type ;
@end


@interface MapBackgroundViewController : UIViewController{
IBOutlet UISegmentedControl *segmentedControl;
MKMapType mapType;
id < ChangeMapTyp> delegate;

}


@property IBOutlet UISegmentedControl *segmentedControl;

@property MKMapType mapType;

@property(strong)id delegate;

- (IBAction)segmentedControllChanged:(id)sender;
Run Code Online (Sandbox Code Playgroud)

MapBackgroundViewController.m

@interface MapBackgroundViewController ()

@end

@implementation MapBackgroundViewController
@synthesize segmentedControl, mapType, delegate;


- (void)viewDidLoad
{
[super viewDidLoad];
// Do any additional setup after loading the view.

[self setDelegate:self];

NSLog(@"%@",self.delegate);

}


- (IBAction)segmentedControllChanged:(id)sender {

if (segmentedControl.selectedSegmentIndex == 0) {
    mapType = MKMapTypeStandard;
}else if (segmentedControl.selectedSegmentIndex …
Run Code Online (Sandbox Code Playgroud)

delegation ios respondstoselector

1
推荐指数
1
解决办法
2370
查看次数

pyspark来自官方文档的线性回归示例 - 结果不佳?

我打算在Spark中使用线性回归.首先,我查看了官方文档中的示例(您可以在此处找到)

我也在stackoverflow上发现了这个问题,这与我的问题基本相同.答案建议调整步长,我也尝试过,但结果仍然是随机的,没有调整步长.我正在使用的代码如下所示:

from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData,100000,0.01)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = …
Run Code Online (Sandbox Code Playgroud)

python linear-regression apache-spark pyspark apache-spark-mllib

1
推荐指数
1
解决办法
7965
查看次数