标签: spark-dataframe

如何用scala数据帧的orderBy中的变量表示列名?

 for (e <- arr02) {
      val df = t04.select("session_id", e)  // right
      val w = Window.partitionBy($"session_id").orderBy($e.desc)  //error
}
Run Code Online (Sandbox Code Playgroud)

e是字符串变量,方法.orderBy($e.desc) e不对,.orderBy($"column_name".desc)是对的。

那么我怎样才能用一个变量来表示一个列名orderBy呢?

scala apache-spark spark-dataframe

1
推荐指数
1
解决办法
4091
查看次数

Pyspark - Sql 过滤器 - 通过检查 id 值是否出现在数组中来选择所有行

我在 Pyspark 中有一个 DataFrame,我需要选择其中 id 值出现在数组中的行。有人可以帮我吗?

例子:

 +---+-----+
 | id| col2|
 +---+-----+
 |123|  2  |
 |245| 32  |
 | 12| 34  |
 |234|  1  |
 +---+-----+
Run Code Online (Sandbox Code Playgroud)

数组:[123, 12, 234]

想要的结果:

+---+-----+
| id| col2|
+---+-----+
|123|  2  |
| 12| 34  |
|234|  1  |
+---+-----+
Run Code Online (Sandbox Code Playgroud)

pyspark spark-dataframe pyspark-sql

1
推荐指数
1
解决办法
5140
查看次数

在 Spark 中使用数据类型 map&lt;string,bigint&gt; 将数据帧写入 csv

我有一个文件是 file1snappy.parquet。它有一个复杂的数据结构,比如地图,里面的数组。处理后我得到了最终结果。在将结果写入 csv 时,我收到一些错误消息

"Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support map<string,bigint> data type."
Run Code Online (Sandbox Code Playgroud)

我使用过的代码:

val conf=new SparkConf().setAppName("student-example").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")
    def sumaggr=udf((aggr: Map[String, collection.mutable.WrappedArray[Long]]) => if (aggr.keySet.contains("aggr")) aggr("aggr").sum else 0)
datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
    datadf.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")
Run Code Online (Sandbox Code Playgroud)

我尝试转换 datadf.toString() 但我仍然面临同样的问题。如何将该结果写入 CSV。

apache-spark rdd spark-dataframe

1
推荐指数
1
解决办法
5444
查看次数

Apache Spark:从行中提取值的问题

我在 Spark 中的 Row 类遇到了很多问题。在我看来 Row 类是一个真正设计糟糕的类。从 Row 中提取一个值应该并不比从 Scala 列表中提取一个值更困难;但实际上,您必须知道列的确切类型才能提取它。你甚至不能把列变成字符串;对于像 Spark 这样的伟大框架来说,这有多荒谬?在现实世界中,在大多数情况下,您不知道列的确切类型,而且在许多情况下,最重要的是,您有数十个或数百个列。下面是一个示例,向您展示我得到的 ClassCastExceptions。

有没有人有任何解决方案可以轻松地从 Row 中提取值?

scala> val df = List((1,2),(3,4)).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]


scala> df.first.getAs[String]("col1")
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
  ... 56 elided

scala> df.first.getAs[Int]("col1")
res12: Int = 1

scala> df.first.getInt(0)
res13: Int = 1

scala> df.first.getLong(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
  at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
  at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
  at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
  ... 56 elided

scala> df.first.getFloat(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Float …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

1
推荐指数
1
解决办法
2680
查看次数

spark sql动态过滤条件

如何在 spark sql 中动态构建布尔过滤条件?拥有:

val d = Seq(1, 2, 3, 5, 6).toDF
d.filter(col("value") === 1 or col("value") === 3).show
Run Code Online (Sandbox Code Playgroud)

我怎样才能动态复制这个:

val desiredThings = Seq(1,3)
Run Code Online (Sandbox Code Playgroud)

我尝试构建过滤器:

val myCondition = desiredThings.map(col("value") === _)
d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)

但失败:

overloaded method value filter with alternatives:
org.apache.spark.api.java.function.FilterFunction[org.apache.spark.sql.Row]
 cannot be applied to (Seq[org.apache.spark.sql.Column])
Run Code Online (Sandbox Code Playgroud)

执行时

d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)

同样在尝试向左折叠时:

val myCondition = desiredThings.foldLeft()((result, entry) => result && col(c.columnCounterId) === entry)
Run Code Online (Sandbox Code Playgroud)

我有编译错误。

如何调整代码以动态生成过滤谓词?

dynamic filter apache-spark apache-spark-sql spark-dataframe

1
推荐指数
1
解决办法
3782
查看次数

引起:com.fasterxml.jackson.databind.JsonMappingException:不兼容的杰克逊版本:2.8.9

当我df.show()打印 DataFrame 行的内容时,出现此错误:

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.8.9
    at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
    at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
    at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:747)
    at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
    at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
Run Code Online (Sandbox Code Playgroud)

这就是我创建的方式df

object Test extends App {

   val spark = SparkSession.builder()
      .config("es.nodes", "XXX.XX.XX.XX")
      .config("es.port", "9200")
      .config("es.nodes.wan.only", "false")
      .config("es.resource","myIndex")
      .appName("Test")
      .master("local[*]")
      .getOrCreate()

   val df_source = spark
                   .read.format("org.elasticsearch.spark.sql")
                   .option("pushdown", "true")
                   .load("myIndex")

   df_source.show(5)

}
Run Code Online (Sandbox Code Playgroud)

我不在我的build.sbt.

更新:

import sbtassembly.AssemblyPlugin.autoImport.assemblyOption
name := "test"
lazy val spark = "org.apache.spark"
lazy val typesafe = "com.typesafe.akka"
val sparkVersion = "2.2.0"
val elasticSparkVersion = …
Run Code Online (Sandbox Code Playgroud)

scala elasticsearch apache-spark spark-dataframe

1
推荐指数
1
解决办法
6446
查看次数

使用Window函数将前一行与当前行相加

我有一个 spark 数据框,我想根据当前行的金额值和基于 groupid 和 id 的金额值的上一行总和来计算运行总计。让我把 df

import findspark
findspark.init()
import pyspark 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd


 sc = spark.sparkContext
data1 = {'date': {0: '2018-04-03', 1: '2018-04-04', 2: '2018-04-05', 3: '2018-04-06', 4: '2018-04-07'},
         'id': {0: 'id1', 1: 'id2', 2: 'id1', 3: 'id3', 4: 'id2'},
         'group': {0: '1', 1: '1', 2: '1', 3: '2', 4: '1'},
         'amount': {0: 50, 1: 40, 2: 50, 3: 55, 4: 20}}
df1_pd = pd.DataFrame(data1, columns=data1.keys())

df1 = spark.createDataFrame(df1_pd) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe

1
推荐指数
1
解决办法
3322
查看次数

Spark Streaming - 将 json 格式的消息 Dstream 到 DataFrame

我正在尝试通过 Apache Spark Streaming 读取 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,它工作正常,直到它执行spark.read.json读取 json 到数据帧。

package consumerTest


import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._

import scala.util.parsing.json.{JSON, JSONObject}

object Consumer {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder
      .master("local")
      .appName("my-spark-app")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate();

    import spark.implicits._


    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "<kafka-server>:9092",
      "key.deserializer" -> …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming kafka-consumer-api spark-dataframe

1
推荐指数
1
解决办法
6397
查看次数

火花有条件的替代,但保持归档价值

我想有条件地在spark中填充nan值(以确保我考虑了我的数据的每个角落情况,而不是简单地用任何替换值填充任何东西).

样本看起来像

case class FooBar(foo:String, bar:String)
val myDf = Seq(("a","first"),("b","second"),("c",null), ("third","fooBar"), ("someMore","null"))
         .toDF("foo","bar")
         .as[FooBar]

+--------+------+
|     foo|   bar|
+--------+------+
|       a| first|
|       b|second|
|       c|  null|
|   third|fooBar|
|someMore|  null|
+--------+------+
Run Code Online (Sandbox Code Playgroud)

不幸

    myDf
        .withColumn(
          "bar",
          when(
            (($"foo" === "c") and ($"bar" isNull)) , "someReplacement" 
          )
        ).show
Run Code Online (Sandbox Code Playgroud)

重置列中的所有常规其他值

+--------+---------------+
|     foo|            bar|
+--------+---------------+
|       a|           null|
|       b|           null|
|       c|someReplacement|
|   third|           null|
|someMore|           null|
+--------+---------------+
Run Code Online (Sandbox Code Playgroud)

myDf
    .withColumn(
      "bar",
      when(
        (($"foo" === "c") and ($"bar" isNull)) or
        (($"foo" …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
314
查看次数

在DataFrame.withColumn中,如何检查列的值是否为空作为第二个参数的条件?

如果我有一个名为df的DataFrame看起来像:

+----+----+
|  a1+  a2|
+----+----+
| foo| bar|
| N/A| baz|
|null| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)

我可以像这样有选择地替换值:

val df2 = df.withColumn("a1", when($"a1" === "N/A", $"a2"))
Run Code Online (Sandbox Code Playgroud)

所以df2看起来像:

+----+----+
|  a1+  a2|
+----+----+
| foo| bar|
| baz| baz|
|null| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)

但是为什么我不能检查它是否为null,例如:

val df3 = df2.withColumn("a1", when($"a1" === null, $"a2"))
Run Code Online (Sandbox Code Playgroud)

这样我得到:

+----+----+
|  a1+  a2|
+----+----+
| foo| bar|
| baz| baz|
| etc| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)

编辑:$“ a1” .isNull似乎不起作用。可能是因为我如何构造用于测试的数据框,如下所示?

val schema = StructType(
                StructField("a1", StringType, false) ::
                StructField("a2", StringType, false) :: Nil …
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark apache-spark-sql spark-dataframe

0
推荐指数
1
解决办法
3450
查看次数