小编him*_*ian的帖子

Spark 将 DataFrame API 中的所有 NaN 替换为 null

我有一个包含许多双(和/或浮点)列的数据框,其中确实包含 NaN。我想用空值替换所有 NaN(即 Float.NaN 和 Double.NaN)。

我可以用例如单列来做到这一点x

val newDf = df.withColumn("x", when($"x".isNaN,lit(null)).otherwise($"x"))
Run Code Online (Sandbox Code Playgroud)

这有效,但我想一次对所有列执行此操作。我最近发现DataFrameNAFunctions( df.na)fill听起来正是我需要的。不幸的是我没有做到以上几点。 fill应该用给定的值替换所有 NaN 和空值,所以我这样做:

df.na.fill(null.asInstanceOf[java.lang.Double]).show
Run Code Online (Sandbox Code Playgroud)

这给了我一个 NullpointerException

还有一个很有前途的replace方法,但我什至无法编译代码:

df.na.replace("x", Map(java.lang.Double.NaN -> null.asInstanceOf[java.lang.Double])).show
Run Code Online (Sandbox Code Playgroud)

奇怪的是,这给了我

Error:(57, 34) type mismatch;
 found   : scala.collection.immutable.Map[scala.Double,java.lang.Double]
 required: Map[Any,Any]
Note: Double <: Any, but trait Map is invariant in type A.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
    df.na.replace("x", Map(java.lang.Double.NaN -> null.asInstanceOf[java.lang.Double])).show
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

2
推荐指数
1
解决办法
6891
查看次数

df.select() 和 df.agg() 有什么区别?

我有一个数据框,我想从中提取最大值、最小值并计算其中的记录数。

数据框是:

scala> val df = spark.range(10000)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)

为了获取我正在使用的所需值df.select(),如下所示:

scala> df.select(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
|      0|   9999|    10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)

这给了我正确的结果,但是当我尝试时df.agg()它也给了我相同的答案。

scala> df.agg(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
|      0|   9999|    10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是它们之间有什么区别df.select()df.agg()如果它们提供相同的结果,我应该使用哪一个以获得更好的性能?

scala aggregate-functions apache-spark apache-spark-sql

2
推荐指数
1
解决办法
3179
查看次数

为什么spark.read.parquet()运行2个作业?

我有一个镶木地板文件,名为test.parquet. 它包含一些整数。当我使用以下代码阅读它时:

val df = spark.read.parquet("test.parquet")

df.show(false)

+---+
|id |
+---+
|11 |
|12 |
|13 |
|14 |
|15 |
|16 |
|17 |
|18 |
|19 |
+---+
Run Code Online (Sandbox Code Playgroud)

在日志中,它显示已执行的 2 个作业。它们如下:

在此输入图像描述

一个是parquet工作,另一个是show工作。然而,当我使用以下代码读取镶木地板文件时:

val df = spark.read.schema(StructType(List(StructField("id",LongType,false)))).parquet("test.parquet")

df.show(false)

+---+
|id |
+---+
|11 |
|12 |
|13 |
|14 |
|15 |
|16 |
|17 |
|18 |
|19 |
+---+
Run Code Online (Sandbox Code Playgroud)

仅执行一项作业,即show

在此输入图像描述

所以,我的问题是:

  1. 为什么第一种方法执行 2 个作业,而第二种方法只执行一个?
  2. 而且,为什么第二种方法比第一种方法更快?

scala apache-spark parquet apache-spark-sql

2
推荐指数
1
解决办法
1447
查看次数

Scala:拆分双引号("")vs单引号('')

我使用双引号将逗号中的字符串拆分为Scala,如下所示:

scala> val a = "a,b,c"
a: String = a,b,c

scala> a.split(",")
res0: Array[String] = Array(a, b, c)
Run Code Online (Sandbox Code Playgroud)

它工作正常.此外,它使用单引号时工作正常:

scala> a.split(',')
res1: Array[String] = Array(a, b, c)
Run Code Online (Sandbox Code Playgroud)

但是,当我使用Double Quotes拆分String with Pipe时,它没有给出正确的结果:

scala> val a = "a|b|c"
a: String = a|b|c

scala> a.split("|")
res3: Array[String] = Array(a, |, b, |, c)
Run Code Online (Sandbox Code Playgroud)

然而,使用单引号可以得到正确的结果:

scala> a.split('|')
res2: Array[String] = Array(a, b, c)
Run Code Online (Sandbox Code Playgroud)

任何人都可以帮助我理解这种行为吗?

scala

2
推荐指数
1
解决办法
1326
查看次数

Apache Flink中保存的默认检查点在哪里?

我是Apache Flink的新手,并且正在研究Apache Flink的示例。我发现,如果发生故障,Flink可以从检查点恢复流处理。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000L);
Run Code Online (Sandbox Code Playgroud)

现在,我的问题是Flink在默认情况下将检查点保留在哪里?

任何帮助表示赞赏!

java apache-flink checkpointing flink-streaming

2
推荐指数
1
解决办法
699
查看次数

如何测试 Kafka 消费者

我有一个 Kafka Consumer(内置于 Scala),它从 Kafka 中提取最新记录。消费者看起来像这样:

val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")

val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))
Run Code Online (Sandbox Code Playgroud)

现在,我想为它编写一个集成测试。有没有测试 Kafka 消费者的方法或最佳实践?

scala apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
7381
查看次数

结构化流 - 无法使用FileContext API管理AWS S3上的元数据日志文件

我在Spark中有一个StreamingQuery(v2.2.0),即

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "s3n://bucket/checkpoint/test")
  .option("path", "s3n://bucket/test")
  .start()
Run Code Online (Sandbox Code Playgroud)

当我运行时,query数据确实在AWS S3上保存并且创建了检查点s3n://bucket/checkpoint/test.但是,我也在日志中收到以下警告:

WARN [oassestreaming.OffsetSeqLog]无法使用FileContext API在路径s3n:// bucket/checpoint/test/offsets管理元数据日志文件.使用FileSystem API代替管理日志文件.失败时日志可能不一致.

我无法理解为什么会出现这种警告.此外,如果发生任何故障,我的检查点会不一致吗?

任何人都可以帮我解决它吗?

scala amazon-s3 apache-spark spark-structured-streaming

1
推荐指数
1
解决办法
738
查看次数

Apache Flink:IDE 执行中的作业恢复未按预期工作

我有一个WordCount用 Flink (Scala) 编写的示例流示例。在其中,我想使用外部化检查点来在发生故障时进行恢复。但它没有按预期工作。

我的代码如下:

object WordCount {
  def main(args: Array[String]) {
    // set up the execution environment
    val env = StreamExecutionEnvironment
      .getExecutionEnvironment
      .setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))

    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000)

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig.setCheckpointTimeout(60000)

    // prevent the tasks from failing if an error happens in …
Run Code Online (Sandbox Code Playgroud)

apache-flink checkpointing flink-streaming

1
推荐指数
1
解决办法
649
查看次数

为什么在 Scala 中不能将 Int 初始化为 null?

我想初始化类型的变量Int作为null在Scala中,但我不能这样做-

scala> val a: Int = null
<console>:11: error: an expression of type Null is ineligible for implicit conversion
   val a: Int = null
Run Code Online (Sandbox Code Playgroud)

但是我可以Stringnull在 Scala 中一样初始化一个类型的变量:

scala> val a: String = null
a: String = null
Run Code Online (Sandbox Code Playgroud)

有谁知道它的原因吗?

注意- 我使用的是 Scala 2.11.8

scala

0
推荐指数
1
解决办法
1011
查看次数

通过CURL进行Solr查询-[globbing]列中的错误范围

我在Solr中有一个名为的核心test。我已经摄取了2个嵌套的JSON文档。它们如下:

{"id":1, "path":"1.parent", "_childDocuments_":{"path":"2.parent.child"}}
Run Code Online (Sandbox Code Playgroud)

{"id":2, "path":"1.parent", "_childDocuments_":{"path":"2.parent.child"}}
Run Code Online (Sandbox Code Playgroud)

当我test通过浏览器查询核心以寻求父子关系的响应时,我更正了回复:

http://localhost:8983/solr/test/select?fl=*,[child%20parentFilter=path:1.parent%20childFilter=path:2.parent.child]&indent=on&q={!parent%20which=%22path:1.parent%22}&wt=json

{
  "responseHeader":{
    "status":0,
    "QTime":2,
    "params":{
      "q":"{!parent which=\"path:1.parent\"}",
      "indent":"on",
      "fl":"*,[child parentFilter=path:1.parent childFilter=path:2.parent.child]",
      "wt":"json"}},
  "response":{"numFound":2,"start":0,"docs":[
      {
        "id":"1",
        "path":["1.parent"],
        "_childDocuments_.path":["2.parent.child"],
        "_version_":1565913168462479360},
      {
        "id":"2",
        "path":["1.parent"],
        "_childDocuments_.path":["2.parent.child"],
        "_version_":1565913171789611008}]
  }}
Run Code Online (Sandbox Code Playgroud)

但是当我尝试通过curl它运行相同的查询时显示错误:

$ curl 'http://localhost:8983/solr/test/select?fl=*,[child%20parentFilter=path:1.parent%20childFilter=path:2.parent.child]&indent=on&q={!parent%20which=%22path:1.parent%22}&wt=json'
curl: (3) [globbing] bad range in column 46
Run Code Online (Sandbox Code Playgroud)

我无法理解。虽然,我可以理解,curl请求给错误上column 46是一个特殊字符[http请求。但是,然后它如何在浏览器中工作,我们如何通过发出请求curl

curl solr

0
推荐指数
1
解决办法
2272
查看次数

如何从Cassandra表中加载行作为Spark中的Dataframe?

我可以将整个Cassandra表加载为如下数据帧

val tableDf = sparkSession.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map( "table" -> table, "keyspace" -> keyspace))
      .load()
Run Code Online (Sandbox Code Playgroud)

但我找不到通过主键获取行的方法,比如

select * from table where key = ''
Run Code Online (Sandbox Code Playgroud)

有没有办法做到这一点?

scala cassandra apache-spark spark-cassandra-connector

0
推荐指数
1
解决办法
3490
查看次数

使用下划线作为标识符时,隐式模式定义不绑定任何变量

我有如下代码:

implicit val _ = new MyClass()
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

Implicit pattern definition binds no variables
Run Code Online (Sandbox Code Playgroud)

为什么?

我正在使用 Scala -> 2.13.3、SBT -> 1.3.13 和 Java -> OpenJDK v14.0.2

scala implicit

0
推荐指数
1
解决办法
112
查看次数