标签: apache-kafka

如何管理 Kafka 组的过期

我试图通过删除消费者配置并让我的脚本稍后重新创建它来重置消费者配置,但我遇到了关于新消费者不可删除的错误。

kafka@kafka-0:~$ ./bin/kafka-consumer-groups.sh --bootstrap kafka-0:9092 --delete --group etl
Option '[delete]' is only valid with '[zookeeper]'. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.
Run Code Online (Sandbox Code Playgroud)

现在我想知道,控制此错误消息过期的消费者配置选项的名称是什么?

stream offset apache-kafka

1
推荐指数
1
解决办法
4488
查看次数

KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量

我正在尝试在 Kafka 中实现事务,Processor以确保不会重复处理同一条消息。给定消息 (A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A)。从文档中我发现该Producer方法sendOffsetsToTransaction似乎只有在成功时才能在事务中提交偏移量。process()这是我的方法中的代码Processor

    producer.beginTransaction()
    val topicPartition    = new TopicPartition(this.context().topic(), this.context().partition())
    val offsetAndMetadata = new OffsetAndMetadata(this.context().offset())
    val map               = Map(topicPartition -> offsetAndMetadata).asJava
    producer.sendOffsetsToTransaction(map, "consumer-group-id")
    items.foreach(x => producer.send(new ProducerRecord("items_topic", x.key, x.value)))
    producer.commitTransaction()
    throw new RuntimeException("expected exception")
Run Code Online (Sandbox Code Playgroud)

不幸的是,使用这段代码(显然每次执行都会失败),每次我在异常后重新启动应用程序时,都会重新处理已处理的消息 (A)。

我设法使其工作,将 a 添加+1到返回的偏移量this.context().offset()val offsetAndMetadata以这种方式重新定义:

val offsetAndMetadata = new OffsetAndMetadata(this.context().offset() + 1)
Run Code Online (Sandbox Code Playgroud)

这是正常行为还是我做错了什么?

谢谢 :)

scala apache-kafka kafka-producer-api apache-kafka-streams

1
推荐指数
1
解决办法
2922
查看次数

kafka代理是否保存复制集而不是分区?

从 Kafka 文档和我阅读的其他一些博客中,我得出的结论是,一个 kafka-broker 由一个主题分区组成。这里它说一个 Kafka-broker 只持有一个分区。我的系统中只有一个代理,但我可以创建一个具有 3 个分区和 1 个复制因子的主题。我还尝试创建一个具有 3 个分区和 3 个复制因子且只有一个代理的主题。它抛出以下错误

Error while executing topic command : replication factor: 3 larger than available brokers: 1 [2017-10-21 15:35:25,928] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 3 larger than available brokers: 1 (kafka.admin.TopicCommand$)

所以我有一个疑问。

  1. Kafka-broker是否持有复制而不是分区?
  2. 如果我使用一个代理创建 3 个分区,会发生什么?
  3. 在1个broker、1个副本和3个分区的情况下,kafka-broker可以容纳多少个单个主题的分区?

有人请解释一下这里发生了什么。

apache-kafka

1
推荐指数
1
解决办法
918
查看次数

基于客户端证书 DN 或其部分的 Kafka ACL 到主题

我正在阅读 Kafka 文档(版本 0.11.0),我希望根据其客户端证书对消费者和发布者的主题进行身份验证和授权。

它的工作方式应该是根据 DN 或仅其一部分(如 CN、电子邮件或其他内容)授予授权。

我发现 Kafka 代理可以配置为通过 TLS 使用安全通信,并根据客户端证书对传入连接进行身份验证。但从我的角度来看,就是这样。这就是 Kafka 代理可以使用客户端证书执行的所有操作。主题的 ACL 应使用 SASL,其中我们可以使用 PLAIN、Kerberos 或 SCRAM。

那么是否可以根据 SSL 证书中的 CN 名称来验证客户端的连接,并将基于它的主题的 ACL 设置放入 Kafka (kafka_2.11-0.11.0.1) 中?

authentication ssl authorization ssl-certificate apache-kafka

1
推荐指数
1
解决办法
4440
查看次数

Kafka Kstream 和 Spring @KafkaListener 有什么不同?

我对卡夫卡有点陌生,正在阅读文档。Kafka 办公网站有一个关于 KStream 的示例。应用程序绑定到某个主题,消息一到达就会立即进行处理。结果将发布回主题或数据库。

Spring Kafka 注释 @KafkaListener 具有相同的功能。例如,我尝试了KafaListner 应用程序。同样在这里,我们收听一个主题并在发布内容时对其进行处理。

所以我很想知道 1. 这两个有什么不同?2. 在什么场景下选择哪一个?

apache-kafka spring-kafka

1
推荐指数
1
解决办法
3126
查看次数

Kafka Streams API 中的任务有什么用途

我试图了解 Kafka Streams API 的架构,并在文档中遇到了这一点:

应用程序的处理器拓扑通过将其分解为多个任务来扩展

将处理器拓扑分解为任务的所有标准是什么?仅仅是流/主题中的分区数量还是更多。

然后,任务可以根据分配的分区实例化自己的处理器拓扑

有人可以用例子解释上面的意思吗?如果创建任务只是为了扩展,那么它们不应该具有相同的拓扑吗?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
977
查看次数

直接写入 kafka 状态存储

我们已经开始试验 Kafka,看看它是否可以用来聚合我们的应用程序数据。我认为我们的用例与 Kafka 流匹配,但我们不确定我们是否正确使用该工具。我们构建的概念验证似乎按设计工作,我不确定我们是否正确使用了 API。

我们的概念证明是使用 kafka 流来保存有关输出主题中程序的信息的运行记录,例如

{ 
  "numberActive": 0, 
  "numberInactive": 0, 
  "lastLogin": "01-01-1970T00:00:00Z"  
}
Run Code Online (Sandbox Code Playgroud)

计算计数很容易,它本质上是根据输入主题和输出字段执行比较和交换(CAS)操作。

本地状态包含给定密钥的最新程序。我们针对状态存储加入输入流,并使用 TransformSupplier 运行 CAS 操作,该操作使用 TransformSupplier 将数据显式写入状态存储

context.put(...)
context.commit();
Run Code Online (Sandbox Code Playgroud)

这是对当地国营商店的适当使用吗?是否还有另一种方法可以在主题中保持状态运行记录?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
1433
查看次数

使用 scalatest-embedded-kafka 集成测试 Flink 和 Kafka

我想用Flink 和 Kafka 运行集成测试。该过程是从 Kafka 读取数据,使用 Flink 进行一些操作,然后将数据流放入 Kafka 中。

我想从头到尾测试这个过程。现在我使用scalatest-embedded-kafka

我在这里举了一个例子,我试图尽可能简单:

import java.util.Properties

import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
import org.scalatest.{Matchers, WordSpec}

import scala.collection.mutable.ListBuffer

object SimpleFlinkKafkaTest {

  class CollectSink extends SinkFunction[String] {
    override def invoke(string: String): Unit = {
      synchronized {
        CollectSink.values += string
      }
    }
  }

  object CollectSink {
    val values: ListBuffer[String] = ListBuffer.empty[String]
  }

  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val props = …
Run Code Online (Sandbox Code Playgroud)

integration-testing scala apache-kafka apache-flink embedded-kafka

1
推荐指数
1
解决办法
1852
查看次数

序列化 Avro 消息时出错

我在我的项目中使用 Kafka Streams 和 Spring Boot。在我的用例中,我通过使用 KStream API 进行序列化和消费来发送 Order 对象SpecificAvroSerializer。当我使用 KafkaProducer 发送对象时,出现以下异常

nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause. java.lang.NullPointerException: null

基于Confluence 示例开发了该项目。不知道我在哪里犯了错误。我真的很感激任何帮助。代码已上传至Github以供参考。

例外:

   2018-04-17 16:19:39.170 ERROR 6161 --- [nio-8090-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error serializing Avro message] with root cause

java.lang.NullPointerException: null
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57) ~[kafka-schema-registry-client-3.0.0.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) ~[kafka-schema-registry-client-3.0.0.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) ~[kafka-avro-serializer-3.0.0.jar:na]
    at …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
2万
查看次数

使用kafka接收器重命名elasticsearch中的索引

我正在使用以下水槽。问题是它将elasticsearch索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用汇合4

{
  "name": "es-sink-mysql-foobar-02",
  "config": {
    "_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schema.registry.url": "http://localhost:8081",


    "_comment": "--- Elasticsearch-specific config ---",
    "_comment": "Elasticsearch server address",
    "connection.url": "http://localhost:9200",

    "_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
    "type.name": "type.name=kafka-connect",
    "index.name": "asimtest",
    "_comment": "Which topic to stream data from into Elasticsearch",
    "topics": "mysql-foobar",

    "_comment": "If the Kafka message doesn't have a key …
Run Code Online (Sandbox Code Playgroud)

elasticsearch apache-kafka kafka-consumer-api apache-kafka-connect confluent-platform

1
推荐指数
1
解决办法
3385
查看次数