小编Tho*_*mas的帖子

Scala:从嵌套案例类到展平案例类

问题是:

如何构建一个通用函数,该函数可以采用由其他案例类组成的任何案例类,并将其展平为一个案例类,其中包含组合案例类中每个案例类的所有值?

例如,我想像这样转换一个嵌套的 case 类

case class A(first: String, second: String)
case class B(value: String)

case class Nested(a: A, b: B)
Run Code Online (Sandbox Code Playgroud)

像这样的扁平案例类

case class Flatten(aFirst: String, aSecond: String, bValue: String)
Run Code Online (Sandbox Code Playgroud)

但我想避免像这样构建自己的构造函数(或手动创建函数):

object Flatten {

  def apply(nested: Nested): Flatten = {
    Flatten(nested.a.first, nested.a.second, nested.b.value)
  }
}
Run Code Online (Sandbox Code Playgroud)

注意:在实际用例中,案例类更复杂,我想在不同的案例类上多次使用该方法。

scala

5
推荐指数
1
解决办法
1010
查看次数

测试kafka和flink集成流程

我想测试卡夫卡/弗林克集成FlinkKafkaConsumer011FlinkKafkaProducer011例如。

该过程将是:

  1. 使用 Flink 从 kafka 主题中读取
  2. 使用 Flink 进行一些操作
  3. 用 Flink 写入另一个 kafka topic

使用字符串示例,从输入主题中读取字符串,转换为大写,写入新主题。

问题是如何测试流量?

当我说测试时,这是单元/集成测试。

谢谢!

integration-testing scala apache-kafka apache-flink

3
推荐指数
1
解决办法
2530
查看次数

使用 scalatest-embedded-kafka 集成测试 Flink 和 Kafka

我想用Flink 和 Kafka 运行集成测试。该过程是从 Kafka 读取数据,使用 Flink 进行一些操作,然后将数据流放入 Kafka 中。

我想从头到尾测试这个过程。现在我使用scalatest-embedded-kafka

我在这里举了一个例子,我试图尽可能简单:

import java.util.Properties

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}

import scala.collection.mutable.ListBuffer

object SimpleFlinkKafkaTest {

  class CollectSink extends SinkFunction[String] {
    override def invoke(string: String): Unit = {
      synchronized {
        CollectSink.values += string
      }
    }
  }

  object CollectSink {
    val values: ListBuffer[String] = ListBuffer.empty[String]
  }

  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val props = …
Run Code Online (Sandbox Code Playgroud)

integration-testing scala apache-kafka apache-flink embedded-kafka

1
推荐指数
1
解决办法
1852
查看次数

Scala检查对象是否为选项

我想检查一个对象是否是一个选项.

例如:

val foo: Option[String] = Some("foo")
val bar: String = "bar"
Run Code Online (Sandbox Code Playgroud)

我想要一个功能isOption:

def isOption(value: Any): Boolean = {
     ???
}
Run Code Online (Sandbox Code Playgroud)

结果将是:

isOption(foo) // true
isOption(bar) // false
Run Code Online (Sandbox Code Playgroud)

monads scala

0
推荐指数
2
解决办法
217
查看次数