标签: apache-kafka

无法将主题映射到 kafka elasticsearch 连接器中的指定索引

尝试将主题“name:localtopic”映射到索引“name:indexoftopic”,它在弹性搜索“localtopic和indexoftopic”中创建两个新索引,并且主题数据仅在主题名称索引“localtopic”中可见,连接器中没有显示错误(分布式模式)

我的配置是

 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "localtopic", 
  "topic.index.map" : "localtopic:indexoftopic",
  "connection.url" : "aws elasticsearch url",
  "type.name" : "event",
  "key.ignore" : "false",
  "schema.ignore" : "true",
  "schemas.enable" : "false",
  "transforms" : "InsertKey,extractKey",
  "transforms.InsertKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.InsertKey.fields" : "event-id",
  "transforms.extractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field" : "event-id"
 }
Run Code Online (Sandbox Code Playgroud)

索引名称:indexoftopic是在elasticsearch中创建的,但数据是通过index_name:localtopic kafkaversion:2.3连接器版本:5 elasticsearch版本:3.2.0看到的

即使在日志 INFO --topics.regex = "" 中,我也不知道 ihis 选项,任何人都可以建议。怎么用这个???

elasticsearch apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
1969
查看次数

如何使用EmbeddedKafkaRule/EmbeddedKafka设置Spring Kafka测试来修复TopicExistsException间歇性错误?

我在测试 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,并显示TopicExistsException.

这就是我当前的测试类 -UserEventListenerTest对于其中一位消费者来说是这样的:

@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
    "application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Value("\${application.kafka.user-event-topic}")
    private lateinit var userEventTopic: String

    @Autowired
    private lateinit var kafkaConfigProperties: KafkaConfigProperties

    private lateinit var embeddedKafka: EmbeddedKafkaRule
    private lateinit var sender: KafkaSender<String, UserEvent>
    private lateinit var receiver: KafkaReceiver<String, UserEvent>

    @BeforeAll
    fun setup() {
        embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
        embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
        embeddedKafka.before()

        val producerProps: HashMap<String, Any> = hashMapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
        )
        val …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka spring-boot spring-kafka spring-kafka-test

1
推荐指数
1
解决办法
9909
查看次数

将 StateRestoreListener 与 Spring Cloud Kafka Streams 绑定器结合使用

我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。

为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。

第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题

StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)

第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误

java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行

是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢

java spring apache-kafka spring-cloud-stream apache-kafka-streams

1
推荐指数
1
解决办法
1338
查看次数

使用 kafka-python 检索主题中的消息

我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))
Run Code Online (Sandbox Code Playgroud)

消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client

任何想法?

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
9556
查看次数

从 Rest API 发送数据到 kafka

我刚刚开始学习 Kafka,我正在尝试构建一个原型来拥有一个 REST API 生产者并将数据发送给 Kafka 消费者。我查阅了大量文档来找出一些特定的程序。

我无法理解是否有一个连接器可以像为 Apache Kafka 提供的文件连接器或 JDBC 连接器一样使用。我应该为此编写一个自定义连接器吗?

我很困惑从哪里开始。我特别寻找一些关于如何完成这项工作的结构化文档或想法。

apache-kafka kafka-producer-api apache-kafka-connect

1
推荐指数
1
解决办法
1万
查看次数

Springboot Kafka - 消费者幂等性

我有一个监听 kafka 的 Spring-boot 应用程序。为了避免重复处理,我尝试进行手动提交。为此,我在阅读主题后立即引用了异步提交消息。但我陷入了如何实现消费者幂等性的困境,这样记录就不会被处理两次。

apache-kafka spring-boot spring-kafka

1
推荐指数
1
解决办法
2063
查看次数

IDEA在kafka 2.4源码中找不到org.apache.kafka.common.message包

我尝试使用IDEA读取从github下载的kafka 2.4源代码。但我发现整个 org.apache.kafka.common.message 包丢失了。什么地方出了错?

在此输入图像描述

apache-kafka

1
推荐指数
1
解决办法
861
查看次数

Kafka消费者组再平衡

我正在使用 kafka 消费者组管理来处理我的消息。

我的消息的处理时间各不相同。因此,我将最大轮询间隔设置为 20 分钟,最大记录数为 20。除了上述两个之外,我还使用 5 个分区和 5 个具有默认配置值的消费者实例。

但我仍然间歇性地收到以下错误:

[Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] Attempt to heartbeat failed since group is rebalancing
Run Code Online (Sandbox Code Playgroud)

我们的理解是,除非在达到消费者配置文档中写入的最大轮询间隔之前未调用轮询,否则不会发生重新平衡。但对我来说,重新平衡只发生在 20 分钟之前。

此外,在运行几个小时后,所有分配的消费者只是离开并说“尝试检测信号失败,因为组正在重新平衡”,并且不会再次加入(理想情况下应该再次加入)。

我在这里错过了什么吗?任何线索都会有帮助。

apache-kafka kafka-consumer-api

1
推荐指数
1
解决办法
2万
查看次数

Kafka 事务性读已提交消费者

我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。

事务性 Kafka Producer 的配置

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the …
Run Code Online (Sandbox Code Playgroud)

transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api

1
推荐指数
1
解决办法
1433
查看次数

如何在我的 kafka 消费者中使用对象反序列化 json 列表?

我的 Kafka Producer 正在发送 Json 格式的对象列表。

我试图弄清楚如何让我的消费者反序列化列表。我能够接收单个对象并读取它,但是当我将代码更改为类型 List 时,我收到以下错误:

Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
    reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X
Run Code Online (Sandbox Code Playgroud)

编辑

通过将 TypeReference 添加到 JsonDeserilizer 已解决此错误。

当前问题

消费消息时,它不是我定义的类型(即 List< X > ),而是返回LinkedHashMap

这是消费者配置:

@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Bean
    public ConsumerFactory<String, List<X>> xConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
                new JsonDeserializer<>(new TypeReference<List<X>>() …
Run Code Online (Sandbox Code Playgroud)

java spring apache-kafka json-deserialization spring-kafka

1
推荐指数
1
解决办法
1万
查看次数