看似简单的问题,却找不到答案.
问题:我创建了一个函数,我将传递给map(),它接受一个字段并从中创建三个字段.我希望map()的输出给我一个新的RDD,包括输入RDD和新/输出RDD的字段.我该怎么做呢?
我是否需要将我的数据键添加到函数的输出中,以便我可以将更多输出RDD加入到我原来的RDD中?这是正确的/最佳做法吗?
def extract_fund_code_from_iv_id(holding):
# Must include key of data for later joining
iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
return iv_id
Run Code Online (Sandbox Code Playgroud)
更基本的,我似乎无法结合两个Row.
row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
Run Code Online (Sandbox Code Playgroud)
这不会像我想要的那样返回一个新的Row().
谢谢
我有一个Spark (version 1.3.1)申请.其中,我试图将一个转换Java bean RDD JavaRDD<Message>为Dataframe,它有许多具有不同数据类型的字段(整数,字符串,列表,映射,双精度).
但是,当我执行我的代码时.
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
SQLContext sqlContext = SparkConnection.getSqlContext();
DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
df.registerTempTable("messages");
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误
/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
scala.MatchError: interface java.util.List (of class java.lang.Class)
at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193)
at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) …Run Code Online (Sandbox Code Playgroud) 我进行了很多搜索,但没有找到在Java代码中执行gregationByKey的示例。
我想查找按键减少的JavaPairRDD中的行数。
我读到aggregateByKey是最好的方法,但是我使用的是Java而不是scala,因此我无法在Java中使用它。
请帮忙!!!
例如:
input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]
output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]
Run Code Online (Sandbox Code Playgroud)
我想与示例完全相同,我想在输出行中增加一列,减少行数。
谢谢!!!
Spark新手在这里,希望你们能给我一些帮助.谢谢!
我正在尝试从CSV文件中提取URL,并且URL位于第16列.问题是URL是以一种奇怪的格式编写的,正如您从下面的代码中打印出来的那样.获取正确格式的URL的最佳方法是什么?
case class log(time_stamp: String, url: String )
val logText = sc.textFile("hdfs://...").map(s => s.split(",")).map( s => log(s(0).replaceAll("\"", ""),s(15).replaceAll("\"", ""))).toDF()
logText.registerTempTable("log")
val results = sqlContext.sql("SELECT * FROM log")
results.map(s => "URL: " + s(1)).collect().foreach(println)
URL: /XXX/YYY/ZZZ/http/www.domain.com/xyz/xyz
URL: /XX/YYY/ZZZ/https/sub.domain.com/xyz/xyz/xyz/xyz
URL: /XXX/YYY/ZZZ/http/www.domain.com/
URL: /VV/XXXX/YYY/ZZZ/https/sub.domain.com/xyz/xyz/xyz
Run Code Online (Sandbox Code Playgroud) 如何使用多列作为键来计算两个Dataframe的连接?例如DF1,DF2是两个dataFrame.
这是我们计算连接的方式,
JoinDF = DF1.join(DF2, DF1("column1") === DF2("column11") && DF1("column2") === DF2("column22"), "outer")
Run Code Online (Sandbox Code Playgroud)
但我的问题是如果它们存储在如下数组中,如何访问多列:
DF1KeyArray=Array{column1,column2}
DF2KeyArray=Array{column11,column22}
Run Code Online (Sandbox Code Playgroud)
那么用这种方法计算连接是不可能的
JoinDF = DF1.join(DF2, DF1(DF1KeyArray)=== DF2(DF2KeyArray), "outer")
Run Code Online (Sandbox Code Playgroud)
在这种情况下错误是:
<console>:128: error: type mismatch;
found : Array[String]
required: String
Run Code Online (Sandbox Code Playgroud)
有没有办法访问多个列作为存储在数组中的键来计算连接?
让我们定义一个Spark管道,它将几列组合在一起,然后应用特征哈希:
val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0))).toDF("colx", "coly", "colz")
val va = new VectorAssembler().setInputCols(Array("colx", "coly", "colz")).setOutputCol("ft")
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val pipeline = new Pipeline().setStages(Array(va, hashIt))
Run Code Online (Sandbox Code Playgroud)
使用pipeline.fit(df)throws 安装管道:
java.lang.IllegalArgumentException:要求失败:输入列必须是ArrayType,但是得到了org.apache.spark.mllib.linalg.VectorUDT@f71b0bce
是否有允许VectorAssembler并HashingTF能够一起工作的变压器?
我尝试在没有StringIndexer的Pipeline中使用Spark ML DecisionTreeClassifier,因为我的功能已被索引为(0.0; 1.0).DecisionTreeClassifier作为标签需要double值,因此此代码应该有效:
def trainDecisionTreeModel(training: RDD[LabeledPoint], sqlc: SQLContext): Unit = {
import sqlc.implicits._
val trainingDF = training.toDF()
//format of this dataframe: [label: double, features: vector]
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4)
.fit(trainingDF)
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
val pipeline = new Pipeline()
.setStages(Array(featureIndexer, dt))
pipeline.fit(trainingDF)
}
Run Code Online (Sandbox Code Playgroud)
但实际上我得到了
java.lang.IllegalArgumentException:
DecisionTreeClassifier was given input with invalid label column label,
without the number of classes specified. See StringIndexer.
Run Code Online (Sandbox Code Playgroud)
当然我可以放置StringIndexer并让它为我的双"标签"字段工作,但我想使用DecisionTreeClassifier的输出rawPrediction列来获得每行0.0和1.0的概率,如...
val predictions = model.transform(singletonDF)
val zeroProbability = predictions.select("rawPrediction").asInstanceOf[Vector](0)
val …Run Code Online (Sandbox Code Playgroud) scala classification apache-spark apache-spark-sql apache-spark-ml
我想知道如何利用spark SQL利用内存处理.如果有的话,目前支持使用内存中对象运行spark SQL的方法是什么?
是否可以将CREATE TABLE AS语句的目标作为内存中的表?
是否可以在FROM子句中引用内存中的对象,如数据帧?
我目前对火花的理解是有限的,所以如果看起来太简单,请原谅我的问题.我将不胜感激任何建议或指导.
我有一个时间段内用户的Lat/Lon格式的位置数据集.我想计算这些用户旅行的距离.样本数据集:
| 时间戳| 用户| 纬度|经度| | 1462838468 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838512 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838389 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838497 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1465975885 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1457723815 | 405C238E25FE0B9E7 ... | 37.177322 | -7.426781 | | 1457897289 | 405C238E25FE0B9E7 ... | 37.177922 | -7.447443 | | 1457899229 | 405C238E25FE0B9E7 ... …
我有以下sparkdataframe:
id weekly_sale
1 40000
2 120000
3 135000
4 211000
5 215000
6 331000
7 337000
Run Code Online (Sandbox Code Playgroud)
我需要查看weekly_sale列中以下哪些间隔项属于:
under 100000
between 100000 and 200000
between 200000 and 300000
more than 300000
Run Code Online (Sandbox Code Playgroud)
所以我想要的输出将是:
id weekly_sale label
1 40000 under 100000
2 120000 between 100000 and 200000
3 135000 between 100000 and 200000
4 211000 between 200000 and 300000
5 215000 between 200000 and 300000
6 331000 more than 300000
7 337000 more than 300000
Run Code Online (Sandbox Code Playgroud)
任何pyspark,spark.sql和Hive上下文实现都将对我有所帮助。
apache-spark-sql ×10
apache-spark ×9
scala ×4
pyspark ×3
dataframe ×2
java ×2
hive ×1
hivecontext ×1
join ×1
rdd ×1