小编Ach*_*eus的帖子

Apache Avro 作为 Apache Spark 2.4 中的内置数据源

我最近读了这篇文章并尝试了这个例子,但是当我运行时

val usersDF = spark.read.format("avro")
                        .load("examples/src/main/resources/users.avro")
Run Code Online (Sandbox Code Playgroud)

但是当我尝试运行它时,这给了我一个错误。

线程“main”org.apache.spark.sql.AnalysisException 中出现异常:无法找到数据源:avro。从 Spark 2.4 开始,Avro 是内置但外部的数据源模块。请按照“Apache Avro 数据源指南”的部署部分部署应用程序。在 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)

apache-spark

12
推荐指数
1
解决办法
2万
查看次数

何时关闭生产者或消费者

最近,我们的 Kafka 消费者和生产者遇到了一些性能问题。我们在scala中使用Kafka Java API。打开和关闭消费者和生产者对象的良好实践是什么?我相信这是一个相当开放式的问题,正确的答案始终是,depends但我正在尝试对此进行推理。

消费者可以长期运行连接并保持打开状态吗?

当我们生产完消息后,生产者是否应该关闭?

apache-kafka kafka-consumer-api kafka-producer-api

6
推荐指数
1
解决办法
3420
查看次数

我可以创建一个服务来公开集群 IP 和 NodePort 吗?

我有 2 个 pod 在 minikube 上运行。Pod A 必须expose 2 ports为 Pod B 提供 8081 和 9092,inside the k8 cluster以便outside k8 clusterpod B 可以访问它们,并且在集群之外我的消费者也可以访问它们。所以我的问题是should2 services为每个端口创建了哪些端口?一个作为集群 IP 公开,另一个作为节点端口公开?或者有没有办法创建一个服务来将其公开为集群 IP 和 NodePort?

另外,如果我必须将节点端口公开为特定端口,那么指定会nodePort扰乱集群 IP 吗?

kubernetes

4
推荐指数
1
解决办法
1764
查看次数

max.poll.records 如何影响消费者投票

max.poll.records最近在 kafka 消费者配置中更改为 500,但我想知道这如何影响消费者投票。它只是可以获取的最大记录数的上限还是消费者等待它获得 500 条记录。

apache-kafka kafka-consumer-api

4
推荐指数
1
解决办法
8225
查看次数

访问k8 minikube集群外的kafka经纪人

我在我的mac上的minikube k8群集上的Pod上运行了一个landoop kafka图像.我有2个不同的服务来公开模式注册表的端口8081和代理的9092.我已经在我的NodePort服务中映射了端口8081 - > 30081和9092 - > 30092,以便我可以从群集外部访问它.但是当我尝试运行控制台消费者或我的消费者应用程序时,Kafka从不消费消息.要验证代理9092端口是否可在k8集群外部访问:

nc <exposed-ip> 30092, it says the port is open.
Run Code Online (Sandbox Code Playgroud)

要验证Schema注册表8081是否可访问:

curl -X GET http://192.168.99.100:30081/subjects
Run Code Online (Sandbox Code Playgroud)

它返回可用的模式.

我有几个问题.1)我们不能以k8集群之外的上述方式从k8集群中访问Kafka吗?如果是这样,我在某种程度上做错了吗?2)如果端口是开放的,这是不是意味着经纪人可用?

任何帮助表示赞赏.谢谢

apache-kafka kubernetes

3
推荐指数
1
解决办法
1067
查看次数

尝试运行 DVC pull 时出现奇怪的错误

我刚开始使用 DVC,只是在探索它。我正在尝试从我团队中另一个人推送的 s3 中提取数据。但我收到此错误:

WARNING: Some of the cache files do not exist neither locally nor on remote. Missing cache files:
name: head_test_file.csv, md5: 45db668193ba44228d61115b1d0304fe
WARNING: Cache '45db668193ba44228d61115b1d0304fe' not found. File 'head_test_file.csv' won't be created.
No changes.
ERROR: failed to pull data from the cloud - Checkout failed for following targets:
head_test_file.csv
Did you forget to fetch?
Run Code Online (Sandbox Code Playgroud)

dvc

3
推荐指数
1
解决办法
1万
查看次数

Kafka Avro Schema演变

我正在尝试更多地了解我们用于Kafka主题的Avro架构,我对此相对较新.

我想知道是否有办法在特定情况下发展模式.我们使用不能为null的新字段或任何默认值更新我们的模式,因为这些新字段是标识符.解决此问题的解决方法是创建新主题,但是有更好的方法来发展现有模式吗?

avro apache-kafka

1
推荐指数
1
解决办法
955
查看次数

自动导入第三方转包

我是GO lang的新手,想知道是否有办法自动下载所有的进口.所以我们假设我需要github.com/gorilla/mux在我的代码库中使用和其他几个包.我应该手动转到我的~/go/src运行,go get repo还是有一种更智能的方式来进行依赖关系管理.我正在使用Goland IDE进行开发.任何帮助表示赞赏.

go goland

1
推荐指数
1
解决办法
1198
查看次数

使用 Akka Actor 时异常处理的正确方法

我正在尝试使用主管在我的 Akka 项目中进行正确的异常处理。只是想确定这是否是处理事情的正确方法。我有 1 个 Actor 系统,它创建 4 个独立的 Actor。假设我的 1 个 Actor 正在访问数据库。监管策略是正确的处理方式吗?

object ExceptionHandlingSupervisor extends App{
  val actorSystem = ActorSystem("ExceptionHandlingActorSystem")
  val actor = actorSystem.actorOf(Props[SupervisorActor], "SupervisorActor")
  actor ! Start

  class SupervisorActor extends Actor {
    val dbActor = context.actorOf(Props[ChildActor],"ChildActor")
    override def receive: Receive = {
      case Start => dbActor ! HitDatabase("","")
    }

    override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false){
      case ae:SQLException => println("Found an SQLException") //Add my logging logic here and error notification logic here
        Resume

      case _:Exception => println("Found …
Run Code Online (Sandbox Code Playgroud)

scala akka

1
推荐指数
1
解决办法
1983
查看次数