我有一个包含许多双(和/或浮点)列的数据框,其中确实包含 NaN。我想用空值替换所有 NaN(即 Float.NaN 和 Double.NaN)。
我可以用例如单列来做到这一点x:
val newDf = df.withColumn("x", when($"x".isNaN,lit(null)).otherwise($"x"))
Run Code Online (Sandbox Code Playgroud)
这有效,但我想一次对所有列执行此操作。我最近发现DataFrameNAFunctions( df.na)fill听起来正是我需要的。不幸的是我没有做到以上几点。 fill应该用给定的值替换所有 NaN 和空值,所以我这样做:
df.na.fill(null.asInstanceOf[java.lang.Double]).show
Run Code Online (Sandbox Code Playgroud)
这给了我一个 NullpointerException
还有一个很有前途的replace方法,但我什至无法编译代码:
df.na.replace("x", Map(java.lang.Double.NaN -> null.asInstanceOf[java.lang.Double])).show
Run Code Online (Sandbox Code Playgroud)
奇怪的是,这给了我
Error:(57, 34) type mismatch;
found : scala.collection.immutable.Map[scala.Double,java.lang.Double]
required: Map[Any,Any]
Note: Double <: Any, but trait Map is invariant in type A.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
df.na.replace("x", Map(java.lang.Double.NaN -> null.asInstanceOf[java.lang.Double])).show
Run Code Online (Sandbox Code Playgroud) 我有一个数据框,我想从中提取最大值、最小值并计算其中的记录数。
数据框是:
scala> val df = spark.range(10000)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)
为了获取我正在使用的所需值df.select(),如下所示:
scala> df.select(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
| 0| 9999| 10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)
这给了我正确的结果,但是当我尝试时df.agg()它也给了我相同的答案。
scala> df.agg(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
| 0| 9999| 10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是它们之间有什么区别df.select(),df.agg()如果它们提供相同的结果,我应该使用哪一个以获得更好的性能?
我有一个镶木地板文件,名为test.parquet. 它包含一些整数。当我使用以下代码阅读它时:
val df = spark.read.parquet("test.parquet")
df.show(false)
+---+
|id |
+---+
|11 |
|12 |
|13 |
|14 |
|15 |
|16 |
|17 |
|18 |
|19 |
+---+
Run Code Online (Sandbox Code Playgroud)
在日志中,它显示已执行的 2 个作业。它们如下:
一个是parquet工作,另一个是show工作。然而,当我使用以下代码读取镶木地板文件时:
val df = spark.read.schema(StructType(List(StructField("id",LongType,false)))).parquet("test.parquet")
df.show(false)
+---+
|id |
+---+
|11 |
|12 |
|13 |
|14 |
|15 |
|16 |
|17 |
|18 |
|19 |
+---+
Run Code Online (Sandbox Code Playgroud)
仅执行一项作业,即show:
所以,我的问题是:
我使用双引号将逗号中的字符串拆分为Scala,如下所示:
scala> val a = "a,b,c"
a: String = a,b,c
scala> a.split(",")
res0: Array[String] = Array(a, b, c)
Run Code Online (Sandbox Code Playgroud)
它工作正常.此外,它使用单引号时工作正常:
scala> a.split(',')
res1: Array[String] = Array(a, b, c)
Run Code Online (Sandbox Code Playgroud)
但是,当我使用Double Quotes拆分String with Pipe时,它没有给出正确的结果:
scala> val a = "a|b|c"
a: String = a|b|c
scala> a.split("|")
res3: Array[String] = Array(a, |, b, |, c)
Run Code Online (Sandbox Code Playgroud)
然而,使用单引号可以得到正确的结果:
scala> a.split('|')
res2: Array[String] = Array(a, b, c)
Run Code Online (Sandbox Code Playgroud)
任何人都可以帮助我理解这种行为吗?
我是Apache Flink的新手,并且正在研究Apache Flink的示例。我发现,如果发生故障,Flink可以从检查点恢复流处理。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000L);
Run Code Online (Sandbox Code Playgroud)
现在,我的问题是Flink在默认情况下将检查点保留在哪里?
任何帮助表示赞赏!
我有一个 Kafka Consumer(内置于 Scala),它从 Kafka 中提取最新记录。消费者看起来像这样:
val consumerProperties = new Properties()
consumerProperties.put("bootstrap.servers", "localhost:9092")
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id", "something")
consumerProperties.put("auto.offset.reset", "latest")
val consumer = new KafkaConsumer[String, String](consumerProperties)
consumer.subscribe(java.util.Collections.singletonList("topic"))
Run Code Online (Sandbox Code Playgroud)
现在,我想为它编写一个集成测试。有没有测试 Kafka 消费者的方法或最佳实践?
我在Spark中有一个StreamingQuery(v2.2.0),即
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("parquet")
.option("checkpointLocation", "s3n://bucket/checkpoint/test")
.option("path", "s3n://bucket/test")
.start()
Run Code Online (Sandbox Code Playgroud)
当我运行时,query数据确实在AWS S3上保存并且创建了检查点s3n://bucket/checkpoint/test.但是,我也在日志中收到以下警告:
WARN [oassestreaming.OffsetSeqLog]无法使用FileContext API在路径s3n:// bucket/checpoint/test/offsets管理元数据日志文件.使用FileSystem API代替管理日志文件.失败时日志可能不一致.
我无法理解为什么会出现这种警告.此外,如果发生任何故障,我的检查点会不一致吗?
任何人都可以帮我解决它吗?
我有一个WordCount用 Flink (Scala) 编写的示例流示例。在其中,我想使用外部化检查点来在发生故障时进行恢复。但它没有按预期工作。
我的代码如下:
object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment
.getExecutionEnvironment
.setStateBackend(new RocksDBStateBackend("file:///path/to/checkpoint", true))
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// set mode to exactly-once (this is the default)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig.setCheckpointTimeout(60000)
// prevent the tasks from failing if an error happens in …Run Code Online (Sandbox Code Playgroud) 我想初始化类型的变量Int作为null在Scala中,但我不能这样做-
scala> val a: Int = null
<console>:11: error: an expression of type Null is ineligible for implicit conversion
val a: Int = null
Run Code Online (Sandbox Code Playgroud)
但是我可以String像null在 Scala 中一样初始化一个类型的变量:
scala> val a: String = null
a: String = null
Run Code Online (Sandbox Code Playgroud)
有谁知道它的原因吗?
注意- 我使用的是 Scala 2.11.8
我在Solr中有一个名为的核心test。我已经摄取了2个嵌套的JSON文档。它们如下:
{"id":1, "path":"1.parent", "_childDocuments_":{"path":"2.parent.child"}}
Run Code Online (Sandbox Code Playgroud)
和
{"id":2, "path":"1.parent", "_childDocuments_":{"path":"2.parent.child"}}
Run Code Online (Sandbox Code Playgroud)
当我test通过浏览器查询核心以寻求父子关系的响应时,我更正了回复:
http://localhost:8983/solr/test/select?fl=*,[child%20parentFilter=path:1.parent%20childFilter=path:2.parent.child]&indent=on&q={!parent%20which=%22path:1.parent%22}&wt=json
{
"responseHeader":{
"status":0,
"QTime":2,
"params":{
"q":"{!parent which=\"path:1.parent\"}",
"indent":"on",
"fl":"*,[child parentFilter=path:1.parent childFilter=path:2.parent.child]",
"wt":"json"}},
"response":{"numFound":2,"start":0,"docs":[
{
"id":"1",
"path":["1.parent"],
"_childDocuments_.path":["2.parent.child"],
"_version_":1565913168462479360},
{
"id":"2",
"path":["1.parent"],
"_childDocuments_.path":["2.parent.child"],
"_version_":1565913171789611008}]
}}
Run Code Online (Sandbox Code Playgroud)
但是当我尝试通过curl它运行相同的查询时显示错误:
$ curl 'http://localhost:8983/solr/test/select?fl=*,[child%20parentFilter=path:1.parent%20childFilter=path:2.parent.child]&indent=on&q={!parent%20which=%22path:1.parent%22}&wt=json'
curl: (3) [globbing] bad range in column 46
Run Code Online (Sandbox Code Playgroud)
我无法理解。虽然,我可以理解,curl请求给错误上column 46是一个特殊字符[的http请求。但是,然后它如何在浏览器中工作,我们如何通过发出请求curl?
我可以将整个Cassandra表加载为如下数据帧
val tableDf = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> table, "keyspace" -> keyspace))
.load()
Run Code Online (Sandbox Code Playgroud)
但我找不到通过主键获取行的方法,比如
select * from table where key = ''
Run Code Online (Sandbox Code Playgroud)
有没有办法做到这一点?
我有如下代码:
implicit val _ = new MyClass()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
Implicit pattern definition binds no variables
Run Code Online (Sandbox Code Playgroud)
为什么?
我正在使用 Scala -> 2.13.3、SBT -> 1.3.13 和 Java -> OpenJDK v14.0.2
scala ×9
apache-spark ×5
apache-flink ×2
amazon-s3 ×1
apache-kafka ×1
cassandra ×1
curl ×1
implicit ×1
java ×1
parquet ×1
solr ×1