for (e <- arr02) {
val df = t04.select("session_id", e) // right
val w = Window.partitionBy($"session_id").orderBy($e.desc) //error
}
Run Code Online (Sandbox Code Playgroud)
e是字符串变量,方法.orderBy($e.desc) e不对,.orderBy($"column_name".desc)是对的。
那么我怎样才能用一个变量来表示一个列名orderBy呢?
我在 Pyspark 中有一个 DataFrame,我需要选择其中 id 值出现在数组中的行。有人可以帮我吗?
例子:
+---+-----+
| id| col2|
+---+-----+
|123| 2 |
|245| 32 |
| 12| 34 |
|234| 1 |
+---+-----+
Run Code Online (Sandbox Code Playgroud)
数组:[123, 12, 234]
想要的结果:
+---+-----+
| id| col2|
+---+-----+
|123| 2 |
| 12| 34 |
|234| 1 |
+---+-----+
Run Code Online (Sandbox Code Playgroud) 我有一个文件是 file1snappy.parquet。它有一个复杂的数据结构,比如地图,里面的数组。处理后我得到了最终结果。在将结果写入 csv 时,我收到一些错误消息
"Exception in thread "main" java.lang.UnsupportedOperationException: CSV data source does not support map<string,bigint> data type."
Run Code Online (Sandbox Code Playgroud)
我使用过的代码:
val conf=new SparkConf().setAppName("student-example").setMaster("local")
val sc = new SparkContext(conf)
val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
val datadf = sqlcontext.read.parquet("C:\\file1.snappy.parquet")
def sumaggr=udf((aggr: Map[String, collection.mutable.WrappedArray[Long]]) => if (aggr.keySet.contains("aggr")) aggr("aggr").sum else 0)
datadf.select(col("neid"),sumaggr(col("marks")).as("sum")).filter(col("sum") =!= 0).show(false)
datadf.write.format("com.databricks.spark.csv").option("header", "true").save("C:\\myfile.csv")
Run Code Online (Sandbox Code Playgroud)
我尝试转换 datadf.toString() 但我仍然面临同样的问题。如何将该结果写入 CSV。
我在 Spark 中的 Row 类遇到了很多问题。在我看来 Row 类是一个真正设计糟糕的类。从 Row 中提取一个值应该并不比从 Scala 列表中提取一个值更困难;但实际上,您必须知道列的确切类型才能提取它。你甚至不能把列变成字符串;对于像 Spark 这样的伟大框架来说,这有多荒谬?在现实世界中,在大多数情况下,您不知道列的确切类型,而且在许多情况下,最重要的是,您有数十个或数百个列。下面是一个示例,向您展示我得到的 ClassCastExceptions。
有没有人有任何解决方案可以轻松地从 Row 中提取值?
scala> val df = List((1,2),(3,4)).toDF("col1","col2")
df: org.apache.spark.sql.DataFrame = [col1: int, col2: int]
scala> df.first.getAs[String]("col1")
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
... 56 elided
scala> df.first.getAs[Int]("col1")
res12: Int = 1
scala> df.first.getInt(0)
res13: Int = 1
scala> df.first.getLong(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
at org.apache.spark.sql.Row$class.getLong(Row.scala:231)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getLong(rows.scala:165)
... 56 elided
scala> df.first.getFloat(0)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Float …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
如何在 spark sql 中动态构建布尔过滤条件?拥有:
val d = Seq(1, 2, 3, 5, 6).toDF
d.filter(col("value") === 1 or col("value") === 3).show
Run Code Online (Sandbox Code Playgroud)
我怎样才能动态复制这个:
val desiredThings = Seq(1,3)
Run Code Online (Sandbox Code Playgroud)
我尝试构建过滤器:
val myCondition = desiredThings.map(col("value") === _)
d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)
但失败:
overloaded method value filter with alternatives:
org.apache.spark.api.java.function.FilterFunction[org.apache.spark.sql.Row]
cannot be applied to (Seq[org.apache.spark.sql.Column])
Run Code Online (Sandbox Code Playgroud)
执行时
d.filter(myCondition).show
Run Code Online (Sandbox Code Playgroud)
同样在尝试向左折叠时:
val myCondition = desiredThings.foldLeft()((result, entry) => result && col(c.columnCounterId) === entry)
Run Code Online (Sandbox Code Playgroud)
我有编译错误。
如何调整代码以动态生成过滤谓词?
dynamic filter apache-spark apache-spark-sql spark-dataframe
当我df.show()打印 DataFrame 行的内容时,出现此错误:
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.8.9
at com.fasterxml.jackson.module.scala.JacksonModule$class.setupModule(JacksonModule.scala:64)
at com.fasterxml.jackson.module.scala.DefaultScalaModule.setupModule(DefaultScalaModule.scala:19)
at com.fasterxml.jackson.databind.ObjectMapper.registerModule(ObjectMapper.java:747)
at org.apache.spark.rdd.RDDOperationScope$.<init>(RDDOperationScope.scala:82)
at org.apache.spark.rdd.RDDOperationScope$.<clinit>(RDDOperationScope.scala)
Run Code Online (Sandbox Code Playgroud)
这就是我创建的方式df:
object Test extends App {
val spark = SparkSession.builder()
.config("es.nodes", "XXX.XX.XX.XX")
.config("es.port", "9200")
.config("es.nodes.wan.only", "false")
.config("es.resource","myIndex")
.appName("Test")
.master("local[*]")
.getOrCreate()
val df_source = spark
.read.format("org.elasticsearch.spark.sql")
.option("pushdown", "true")
.load("myIndex")
df_source.show(5)
}
Run Code Online (Sandbox Code Playgroud)
我不在我的build.sbt.
更新:
import sbtassembly.AssemblyPlugin.autoImport.assemblyOption
name := "test"
lazy val spark = "org.apache.spark"
lazy val typesafe = "com.typesafe.akka"
val sparkVersion = "2.2.0"
val elasticSparkVersion = …Run Code Online (Sandbox Code Playgroud) 我有一个 spark 数据框,我想根据当前行的金额值和基于 groupid 和 id 的金额值的上一行总和来计算运行总计。让我把 df
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pandas as pd
sc = spark.sparkContext
data1 = {'date': {0: '2018-04-03', 1: '2018-04-04', 2: '2018-04-05', 3: '2018-04-06', 4: '2018-04-07'},
'id': {0: 'id1', 1: 'id2', 2: 'id1', 3: 'id3', 4: 'id2'},
'group': {0: '1', 1: '1', 2: '1', 3: '2', 4: '1'},
'amount': {0: 50, 1: 40, 2: 50, 3: 55, 4: 20}}
df1_pd = pd.DataFrame(data1, columns=data1.keys())
df1 = spark.createDataFrame(df1_pd) …Run Code Online (Sandbox Code Playgroud) 我正在尝试通过 Apache Spark Streaming 读取 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时表中。Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,它工作正常,直到它执行spark.read.json读取 json 到数据帧。
package consumerTest
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import scala.util.parsing.json.{JSON, JSONObject}
object Consumer {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.master("local")
.appName("my-spark-app")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "<kafka-server>:9092",
"key.deserializer" -> …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming kafka-consumer-api spark-dataframe
我想有条件地在spark中填充nan值(以确保我考虑了我的数据的每个角落情况,而不是简单地用任何替换值填充任何东西).
样本看起来像
case class FooBar(foo:String, bar:String)
val myDf = Seq(("a","first"),("b","second"),("c",null), ("third","fooBar"), ("someMore","null"))
.toDF("foo","bar")
.as[FooBar]
+--------+------+
| foo| bar|
+--------+------+
| a| first|
| b|second|
| c| null|
| third|fooBar|
|someMore| null|
+--------+------+
Run Code Online (Sandbox Code Playgroud)
不幸
myDf
.withColumn(
"bar",
when(
(($"foo" === "c") and ($"bar" isNull)) , "someReplacement"
)
).show
Run Code Online (Sandbox Code Playgroud)
重置列中的所有常规其他值
+--------+---------------+
| foo| bar|
+--------+---------------+
| a| null|
| b| null|
| c|someReplacement|
| third| null|
|someMore| null|
+--------+---------------+
Run Code Online (Sandbox Code Playgroud)
和
myDf
.withColumn(
"bar",
when(
(($"foo" === "c") and ($"bar" isNull)) or
(($"foo" …Run Code Online (Sandbox Code Playgroud) 如果我有一个名为df的DataFrame看起来像:
+----+----+
| a1+ a2|
+----+----+
| foo| bar|
| N/A| baz|
|null| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)
我可以像这样有选择地替换值:
val df2 = df.withColumn("a1", when($"a1" === "N/A", $"a2"))
Run Code Online (Sandbox Code Playgroud)
所以df2看起来像:
+----+----+
| a1+ a2|
+----+----+
| foo| bar|
| baz| baz|
|null| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)
但是为什么我不能检查它是否为null,例如:
val df3 = df2.withColumn("a1", when($"a1" === null, $"a2"))
Run Code Online (Sandbox Code Playgroud)
这样我得到:
+----+----+
| a1+ a2|
+----+----+
| foo| bar|
| baz| baz|
| etc| etc|
+----+----+
Run Code Online (Sandbox Code Playgroud)
编辑:$“ a1” .isNull似乎不起作用。可能是因为我如何构造用于测试的数据框,如下所示?
val schema = StructType(
StructField("a1", StringType, false) ::
StructField("a2", StringType, false) :: Nil …Run Code Online (Sandbox Code Playgroud) scala dataframe apache-spark apache-spark-sql spark-dataframe
spark-dataframe ×10
apache-spark ×9
scala ×4
pyspark ×2
dataframe ×1
dynamic ×1
filter ×1
pyspark-sql ×1
rdd ×1