标签: apache-spark-sql

在Spark中组合Row()

看似简单的问题,却找不到答案.

问题:我创建了一个函数,我将传递给map(),它接受一个字段并从中创建三个字段.我希望map()的输出给我一个新的RDD,包括输入RDD和新/输出RDD的字段.我该怎么做呢?

我是否需要将我的数据键添加到函数的输出中,以便我可以将更多输出RDD加入到我原来的RDD中?这是正确的/最佳做法吗?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id
Run Code Online (Sandbox Code Playgroud)

更基本的,我似乎无法结合两个Row.

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2
Run Code Online (Sandbox Code Playgroud)

这不会像我想要的那样返回一个新的Row().

谢谢

apache-spark-sql pyspark

0
推荐指数
1
解决办法
2694
查看次数

scala.MatchError:在Dataframes中

我有一个Spark (version 1.3.1)申请.其中,我试图将一个转换Java bean RDD JavaRDD<Message>为Dataframe,它有许多具有不同数据类型的字段(整数,字符串,列表,映射,双精度).

但是,当我执行我的代码时.

messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
            @Override
            public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
                SQLContext sqlContext = SparkConnection.getSqlContext();
                DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
                df.registerTempTable("messages");
Run Code Online (Sandbox Code Playgroud)

我收到了这个错误

/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
scala.MatchError: interface java.util.List (of class java.lang.Class)
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193)
    at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) …
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark spark-streaming apache-spark-sql

0
推荐指数
1
解决办法
3673
查看次数

如何在Java中的javaPairRDD上使用gregationByKey?

我进行了很多搜索,但没有找到在Java代码中执行gregationByKey的示例。

我想查找按键减少的JavaPairRDD中的行数。

我读到aggregateByKey是最好的方法,但是我使用的是Java而不是scala,因此我无法在Java中使用它。

请帮忙!!!

例如:

input: [(key1,[name:abc,email:def,address:ghi]),(key1,[name:abc,email:def,address:ghi]),(key2,[name:abc,email:def,address:ghi])]

output: [(key1,[name:abc,email:def,address:ghi, count:2]),(key2,[name:abc,email:def,address:ghi, count:1])]
Run Code Online (Sandbox Code Playgroud)

我想与示例完全相同,我想在输出行中增加一列,减少行数。

谢谢!!!

java apache-spark rdd apache-spark-sql

0
推荐指数
1
解决办法
5813
查看次数

使用Spark以CSV格式提取部分字符串

Spark新手在这里,希望你们能给我一些帮助.谢谢!

我正在尝试从CSV文件中提取URL,并且URL位于第16列.问题是URL是以一种奇怪的格式编写的,正如您从下面的代码中打印出来的那样.获取正确格式的URL的最佳方法是什么?

case class log(time_stamp: String, url: String )

val logText = sc.textFile("hdfs://...").map(s => s.split(",")).map( s => log(s(0).replaceAll("\"", ""),s(15).replaceAll("\"", ""))).toDF()


logText.registerTempTable("log")

val results = sqlContext.sql("SELECT * FROM log")
results.map(s => "URL: " + s(1)).collect().foreach(println)

URL: /XXX/YYY/ZZZ/http/www.domain.com/xyz/xyz
URL: /XX/YYY/ZZZ/https/sub.domain.com/xyz/xyz/xyz/xyz
URL: /XXX/YYY/ZZZ/http/www.domain.com/
URL: /VV/XXXX/YYY/ZZZ/https/sub.domain.com/xyz/xyz/xyz
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
3144
查看次数

使用多列作为密钥存储在Apache Spark中的数组中,连接两个Dataframe

如何使用多列作为键来计算两个Dataframe的连接?例如DF1,DF2是两个dataFrame.

这是我们计算连接的方式,

JoinDF = DF1.join(DF2, DF1("column1") === DF2("column11") && DF1("column2") === DF2("column22"), "outer") 
Run Code Online (Sandbox Code Playgroud)

但我的问题是如果它们存储在如下数组中,如何访问多列:

DF1KeyArray=Array{column1,column2}
DF2KeyArray=Array{column11,column22}
Run Code Online (Sandbox Code Playgroud)

那么用这种方法计算连接是不可能的

JoinDF = DF1.join(DF2, DF1(DF1KeyArray)=== DF2(DF2KeyArray), "outer")
Run Code Online (Sandbox Code Playgroud)

在这种情况下错误是:

<console>:128: error: type mismatch;
found   : Array[String]
required: String
Run Code Online (Sandbox Code Playgroud)

有没有办法访问多个列作为存储在数组中的键来计算连接?

scala join dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
5684
查看次数

组合VectorAssembler和HashingTF变换器的Spark管道

让我们定义一个Spark管道,它将几列组合在一起,然后应用特征哈希:

val df = sqlContext.createDataFrame(Seq((0.0, 1.0, 2.0), (3.0, 4.0, 5.0))).toDF("colx", "coly", "colz")
val va = new VectorAssembler().setInputCols(Array("colx", "coly", "colz")).setOutputCol("ft")
val hashIt = new HashingTF().setInputCol("ft").setOutputCol("ft2")
val pipeline = new Pipeline().setStages(Array(va, hashIt))
Run Code Online (Sandbox Code Playgroud)

使用pipeline.fit(df)throws 安装管道:

java.lang.IllegalArgumentException:要求失败:输入列必须是ArrayType,但是得到了org.apache.spark.mllib.linalg.VectorUDT@f71b0bce

是否有允许VectorAssemblerHashingTF能够一起工作的变压器?

apache-spark apache-spark-sql apache-spark-ml

0
推荐指数
1
解决办法
551
查看次数

如何在没有StringIndexer的Spark ML中进行二进制分类

我尝试在没有StringIndexer的Pipeline中使用Spark ML DecisionTreeClassifier,因为我的功能已被索引为(0.0; 1.0).DecisionTreeClassifier作为标签需要double值,因此此代码应该有效:

def trainDecisionTreeModel(training: RDD[LabeledPoint], sqlc: SQLContext): Unit = {
  import sqlc.implicits._
  val trainingDF = training.toDF()
  //format of this dataframe: [label: double, features: vector]

  val featureIndexer = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(4)
    .fit(trainingDF)

  val dt = new DecisionTreeClassifier()
    .setLabelCol("label")
    .setFeaturesCol("indexedFeatures")


  val pipeline = new Pipeline()
    .setStages(Array(featureIndexer, dt))
  pipeline.fit(trainingDF)
}
Run Code Online (Sandbox Code Playgroud)

但实际上我得到了

java.lang.IllegalArgumentException:
DecisionTreeClassifier was given input with invalid label column label,
without the number of classes specified. See StringIndexer.
Run Code Online (Sandbox Code Playgroud)

当然我可以放置StringIndexer并让它为我的双"标签"字段工作,但我想使用DecisionTreeClassifier的输出rawPrediction列来获得每行0.0和1.0的概率,如...

val predictions = model.transform(singletonDF) 
val zeroProbability = predictions.select("rawPrediction").asInstanceOf[Vector](0)
val …
Run Code Online (Sandbox Code Playgroud)

scala classification apache-spark apache-spark-sql apache-spark-ml

0
推荐指数
1
解决办法
804
查看次数

是否可以在SQL连接中引用数据框?

我想知道如何利用spark SQL利用内存处理.如果有的话,目前支持使用内存中对象运行spark SQL的方法是什么?

  • 是否可以将CREATE TABLE AS语句的目标作为内存中的表?

  • 是否可以在FROM子句中引用内存中的对象,如数据帧?

我目前对火花的理解是有限的,所以如果看起来太简单,请原谅我的问题.我将不胜感激任何建议或指导.

hive apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
322
查看次数

如何使用(Py)Spark汇总数据集中数据点之间的距离?

我有一个时间段内用户的Lat/Lon格式的位置数据集.我想计算这些用户旅行的距离.样本数据集:

| 时间戳| 用户| 纬度|经度| | 1462838468 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838512 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838389 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838497 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1465975885 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1457723815 | 405C238E25FE0B9E7 ... | 37.177322 | -7.426781 | | 1457897289 | 405C238E25FE0B9E7 ... | 37.177922 | -7.447443 | | 1457899229 | 405C238E25FE0B9E7 ... …

apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1926
查看次数

如何在范围内划分数字列并为Apache Spark中的每个范围分配标签?

我有以下sparkdataframe:

id weekly_sale
1    40000
2    120000
3    135000
4    211000
5    215000
6    331000
7    337000
Run Code Online (Sandbox Code Playgroud)

我需要查看weekly_sale列中以下哪些间隔项属于:

under 100000
between 100000 and 200000
between 200000 and 300000
more than 300000
Run Code Online (Sandbox Code Playgroud)

所以我想要的输出将是:

id weekly_sale  label
1    40000       under 100000    
2    120000      between 100000 and 200000
3    135000      between 100000 and 200000
4    211000      between 200000 and 300000
5    215000      between 200000 and 300000
6    331000      more than 300000
7    337000      more than 300000
Run Code Online (Sandbox Code Playgroud)

任何pyspark,spark.sql和Hive上下文实现都将对我有所帮助。

dataframe apache-spark apache-spark-sql pyspark hivecontext

0
推荐指数
1
解决办法
1154
查看次数