我想知道如何在Spark(Pyspark)中实现以下功能
初始数据帧:
+--+---+
|id|num|
+--+---+
|4 |9.0|
+--+---+
|3 |7.0|
+--+---+
|2 |3.0|
+--+---+
|1 |5.0|
+--+---+
Run Code Online (Sandbox Code Playgroud)
结果数据帧:
+--+---+-------+
|id|num|new_Col|
+--+---+-------+
|4 |9.0| 7.0 |
+--+---+-------+
|3 |7.0| 3.0 |
+--+---+-------+
|2 |3.0| 5.0 |
+--+---+-------+
Run Code Online (Sandbox Code Playgroud)
我设法通过以下方式将新列"附加"到数据框中:
df.withColumn("new_Col", df.num * 10)
但是我不知道如何为新列实现这种"行的移位",以便新列具有前一行的字段值(如示例所示).我还在API文档中找不到有关如何通过索引访问DF中某一行的任何内容.
任何帮助,将不胜感激.
我正在使用两个不同的窗口运行Spark Streaming(在窗口上用SKLearn训练模型,另一个用于根据该模型预测值)我想知道如何避免一个窗口("慢"训练窗口)到训练模型,而不"阻塞""快速"预测窗口.
我的简化代码如下:
conf = SparkConf()
conf.setMaster("local[4]")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 1)
stream = ssc.socketTextStream("localhost", 7000)
import Custom_ModelContainer
### Window 1 ###
### predict data based on model computed in window 2 ###
def predict(time, rdd):
try:
# ... rdd conversion to df, feature extraction etc...
# regular python code
X = np.array(df.map(lambda lp: lp.features.toArray()).collect())
pred = Custom_ModelContainer.getmodel().predict(X)
# send prediction to GUI
except Exception, e: print e
predictionStream = stream.window(60,60)
predictionStream.foreachRDD(predict)
### Window 2 ###
### fit …Run Code Online (Sandbox Code Playgroud) 我对神经网络和Keras库很新,我想知道如何使用这里描述的嵌入层来掩盖我的输入数据从2D张量到RNN的3D张量.
说我的时间序列数据如下(随着时间的增加):
X_train = [
[1.0,2.0,3.0,4.0],
[2.0,5.0,6.0,7.0],
[3.0,8.0,9.0,10.0],
[4.0,11.0,12.0,13.0],
...
] # with a length of 1000
Run Code Online (Sandbox Code Playgroud)
现在,假设我想给RNN最后2个特征向量,以便预测时间t + 1的特征向量.
目前(不埋层),我创造与形状(nb_samples,时间步长,input_dim)自己所需要的三维张量(如本例这里).
与我的例子相关,最终的3D Tensor看起来如下:
X_train_2 = [
[[1.0,2.0,3.0,4.0],
[2.0,5.0,6.0,7.0]],
[[2.0,5.0,6.0,7.0],
[3.0,8.0,9.0,10.0]],
[[3.0,8.0,9.0,10.0],
[4.0,11.0,12.0,13.0]],
etc...
]
Run Code Online (Sandbox Code Playgroud)
和Y_train:
Y_train = [
[3.0,8.0,9.0,10.0],
[4.0,11.0,12.0,13.0],
etc...
]
Run Code Online (Sandbox Code Playgroud)
我的模型如下(适用于上面的简化示例):
num_of_vectors = 2
vect_dimension = 4
model = Sequential()
model.add(SimpleRNN(hidden_neurons, return_sequences=False, input_shape=(num_of_vectors, vect_dimension)))
model.add(Dense(vect_dimension))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
model.fit(X_train, Y_train, batch_size=50, nb_epoch=10, validation_split=0.15)
Run Code Online (Sandbox Code Playgroud)
最后,我的问题是,我怎么能避免那些2D张量到3D张量重塑我自己并使用嵌入层呢?我想在model = sequential()之后,我将不得不添加如下内容:
model.add(Embedding(?????))
Run Code Online (Sandbox Code Playgroud)
可能答案很简单,我只是对嵌入层的文档感到困惑.
我的目标是将自定义.py文件导入我的spark应用程序并调用该文件中包含的一些函数
这是我尝试过的:
我有一个名为Test.py的测试文件,如下所示:
def func():
print "Import is working"
Run Code Online (Sandbox Code Playgroud)
在我的Spark应用程序中,我执行以下操作(如文档中所述):
sc = SparkContext(conf=conf, pyFiles=['/[AbsolutePathTo]/Test.py'])
Run Code Online (Sandbox Code Playgroud)
我也尝试了这个(在创建Spark上下文之后):
sc.addFile("/[AbsolutePathTo]/Test.py")
Run Code Online (Sandbox Code Playgroud)
在提交我的spark应用程序时,我甚至尝试过以下方法:
./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M2 --py-files /[AbsolutePath]/Test.py ../Main/Code/app.py
Run Code Online (Sandbox Code Playgroud)
但是,我总是得到一个名称错误:
NameError: name 'func' is not defined
Run Code Online (Sandbox Code Playgroud)
当我在app.py中调用func()时.(如果我尝试调用Test.func(),则与'Test'相同的错误)
最后,我还尝试使用与上面相同的命令在pyspark shell中导入文件:
sc.addFile("/[AbsolutePathTo]/Test.py")
Run Code Online (Sandbox Code Playgroud)
奇怪的是,我没有在导入时出错,但仍然,我不能在没有得到错误的情况下调用func().此外,不确定它是否重要,但我在一台机器上本地使用火花.
我真的尝试了我能想到的一切,但仍然无法让它发挥作用.可能我错过了一些非常简单的事情.任何帮助,将不胜感激.
我拼命想把Cassandra连接到pyspark,但是我无法让它工作.我对Spark和cassandra很新,所以我可能会错过一些相当简单的东西.
我对网上所有不同的解释感到有些困惑,但从我的理解,最简单的方法是使用"Spark包"?(http://spark-packages.org/package/TargetHolding/pyspark-cassandra)
因此,使用以下命令:
./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py
Run Code Online (Sandbox Code Playgroud)
如果我使用如上所述的spark包,我是否理解我不需要下载任何包?
在myPysparkFile.py中我尝试了以下两个版本,我都没有为我工作:
版本1,我从http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra的第14页获得:
"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark_cassandra import CassandraSparkContext,Row
conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")
sc = CassandraSparkContext(conf=conf)
rdd = sc.cassandraTable("test", "words")
Run Code Online (Sandbox Code Playgroud)
作为一个错误我得到:
ImportError: No module named pyspark_cassandra
Run Code Online (Sandbox Code Playgroud)
版本2(灵感来自:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):
"SparkCassandraTest.py"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("local[4]")
conf.setAppName("Spark Cassandra")
conf.set("spark.cassandra.connection.host","http://127.0.0.1")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sqlContext.read\
.format("org.apache.spark.sql.cassandra")\
.options(table="kv", keyspace="test")\
.load().show()
Run Code Online (Sandbox Code Playgroud)
这给了我以下错误:
py4j.protocol.Py4JJavaError: An …Run Code Online (Sandbox Code Playgroud) 我试图在两个Viewcontrollers之间使用委托,但不幸的是我的代表没有被解雇.我希望有人可以帮我解决问题.我有一个名为MapBackgroundViewController的ViewContoller和一个名为MapsViewController的ViewContoller.如果MapsBackgroundViewController的SegmentedControl发生更改,则应通知MapsViewController.(我实际上尝试在iPhone上使用地面卷曲实现类似地图应用程序的东西)
这是我的代码的一部分:
MapBackgroundViewController.h
@protocol ChangeMapTyp <NSObject>
@required
- (void)segmentedControllChangedMapType:(MKMapType) type ;
@end
@interface MapBackgroundViewController : UIViewController{
IBOutlet UISegmentedControl *segmentedControl;
MKMapType mapType;
id < ChangeMapTyp> delegate;
}
@property IBOutlet UISegmentedControl *segmentedControl;
@property MKMapType mapType;
@property(strong)id delegate;
- (IBAction)segmentedControllChanged:(id)sender;
Run Code Online (Sandbox Code Playgroud)
MapBackgroundViewController.m
@interface MapBackgroundViewController ()
@end
@implementation MapBackgroundViewController
@synthesize segmentedControl, mapType, delegate;
- (void)viewDidLoad
{
[super viewDidLoad];
// Do any additional setup after loading the view.
[self setDelegate:self];
NSLog(@"%@",self.delegate);
}
- (IBAction)segmentedControllChanged:(id)sender {
if (segmentedControl.selectedSegmentIndex == 0) {
mapType = MKMapTypeStandard;
}else if (segmentedControl.selectedSegmentIndex …Run Code Online (Sandbox Code Playgroud) 我打算在Spark中使用线性回归.首先,我查看了官方文档中的示例(您可以在此处找到)
我也在stackoverflow上发现了这个问题,这与我的问题基本相同.答案建议调整步长,我也尝试过,但结果仍然是随机的,没有调整步长.我正在使用的代码如下所示:
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)
# Build the model
model = LinearRegressionWithSGD.train(parsedData,100000,0.01)
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = …Run Code Online (Sandbox Code Playgroud) python linear-regression apache-spark pyspark apache-spark-mllib
apache-spark ×5
pyspark ×4
python ×4
cassandra ×1
dataframe ×1
delegation ×1
embedding ×1
ios ×1
keras ×1
scikit-learn ×1