尝试将主题“name:localtopic”映射到索引“name:indexoftopic”,它在弹性搜索“localtopic和indexoftopic”中创建两个新索引,并且主题数据仅在主题名称索引“localtopic”中可见,连接器中没有显示错误(分布式模式)
我的配置是
"config" : {
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max" : "1",
"topics" : "localtopic",
"topic.index.map" : "localtopic:indexoftopic",
"connection.url" : "aws elasticsearch url",
"type.name" : "event",
"key.ignore" : "false",
"schema.ignore" : "true",
"schemas.enable" : "false",
"transforms" : "InsertKey,extractKey",
"transforms.InsertKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields" : "event-id",
"transforms.extractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field" : "event-id"
}
Run Code Online (Sandbox Code Playgroud)
索引名称:indexoftopic是在elasticsearch中创建的,但数据是通过index_name:localtopic kafkaversion:2.3连接器版本:5 elasticsearch版本:3.2.0看到的
即使在日志 INFO --topics.regex = "" 中,我也不知道 ihis 选项,任何人都可以建议。怎么用这个???
我在测试 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,并显示TopicExistsException
.
这就是我当前的测试类 -UserEventListenerTest
对于其中一位消费者来说是这样的:
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
"application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Value("\${application.kafka.user-event-topic}")
private lateinit var userEventTopic: String
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
private lateinit var embeddedKafka: EmbeddedKafkaRule
private lateinit var sender: KafkaSender<String, UserEvent>
private lateinit var receiver: KafkaReceiver<String, UserEvent>
@BeforeAll
fun setup() {
embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
embeddedKafka.before()
val producerProps: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
)
val …
Run Code Online (Sandbox Code Playgroud) kotlin apache-kafka spring-boot spring-kafka spring-kafka-test
我将结合使用 StateRestoreListener 和 Spring Cloud Kafka Streams 绑定器。我需要监视应用程序的容错状态存储的恢复进度。汇合https://docs.confluence.io/current/streams/monitoring.html#streams-monitoring-runtime-status中有示例。
为了观察所有状态存储的恢复,您需要为应用程序提供 org.apache.kafka.streams.processor.StateRestoreListener 接口的实例。您可以通过调用 KafkaStreams#setGlobalStateRestoreListener 方法来设置 org.apache.kafka.streams.processor.StateRestoreListener。
第一个问题是从应用程序获取 Kafka Streams。我用以下方法解决了这个问题
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
Run Code Online (Sandbox Code Playgroud)
第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误
java.lang.IllegalStateException:只能在 CREATED 状态下设置 GlobalStateRestoreListener。当前状态是:正在运行
是否可以在 Spring Cloud Kafka Streams 绑定器中使用 StateRestoreListener?谢谢
java spring apache-kafka spring-cloud-stream apache-kafka-streams
我已经使用kafka-python
库编写了一个 python 脚本,它将消息写入和读取到kafka
. 我写消息没有任何问题;kafka
我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
Run Code Online (Sandbox Code Playgroud)
消费者被创建并完全订阅;我可以看到它my-topic
列在其属性的主题列表中_client
。
任何想法?
我刚刚开始学习 Kafka,我正在尝试构建一个原型来拥有一个 REST API 生产者并将数据发送给 Kafka 消费者。我查阅了大量文档来找出一些特定的程序。
我无法理解是否有一个连接器可以像为 Apache Kafka 提供的文件连接器或 JDBC 连接器一样使用。我应该为此编写一个自定义连接器吗?
我很困惑从哪里开始。我特别寻找一些关于如何完成这项工作的结构化文档或想法。
我有一个监听 kafka 的 Spring-boot 应用程序。为了避免重复处理,我尝试进行手动提交。为此,我在阅读主题后立即引用了异步提交消息。但我陷入了如何实现消费者幂等性的困境,这样记录就不会被处理两次。
我正在使用 kafka 消费者组管理来处理我的消息。
我的消息的处理时间各不相同。因此,我将最大轮询间隔设置为 20 分钟,最大记录数为 20。除了上述两个之外,我还使用 5 个分区和 5 个具有默认配置值的消费者实例。
但我仍然间歇性地收到以下错误:
[Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] Attempt to heartbeat failed since group is rebalancing
Run Code Online (Sandbox Code Playgroud)
我们的理解是,除非在达到消费者配置文档中写入的最大轮询间隔之前未调用轮询,否则不会发生重新平衡。但对我来说,重新平衡只发生在 20 分钟之前。
此外,在运行几个小时后,所有分配的消费者只是离开并说“尝试检测信号失败,因为组正在重新平衡”,并且不会再次加入(理想情况下应该再次加入)。
我在这里错过了什么吗?任何线索都会有帮助。
我在应用程序中有事务性和普通的生产者,它们正在写入主题 kafka-topic ,如下所示。
事务性 Kafka Producer 的配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the …
Run Code Online (Sandbox Code Playgroud) transactions apache-kafka kafka-consumer-api spring-kafka kafka-transactions-api
我的 Kafka Producer 正在发送 Json 格式的对象列表。
我试图弄清楚如何让我的消费者反序列化列表。我能够接收单个对象并读取它,但是当我将代码更改为类型 List 时,我收到以下错误:
Error:(32, 47) java: incompatible types: cannot infer type arguments for org.springframework.kafka.core.DefaultKafkaConsumerFactory<>
reason: inference variable V has incompatible equality constraints java.util.List<nl.domain.X>,nl.domain.X
Run Code Online (Sandbox Code Playgroud)
编辑
通过将 TypeReference 添加到 JsonDeserilizer 已解决此错误。
当前问题:
消费消息时,它不是我定义的类型(即 List< X > ),而是返回LinkedHashMap
这是消费者配置:
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, List<X>> xConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_json");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(new TypeReference<List<X>>() …
Run Code Online (Sandbox Code Playgroud) apache-kafka ×10
spring-kafka ×4
java ×2
spring ×2
spring-boot ×2
kafka-python ×1
kotlin ×1
python ×1
transactions ×1