标签: apache-kafka

结构化流式 Kafka 源偏移存储

我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?

干杯

offset apache-kafka apache-spark spark-streaming spark-structured-streaming

0
推荐指数
1
解决办法
2527
查看次数

Kafka connect(单机)将数据写入多个分区

我正在尝试使用 Kafka connect 使用独立模式写入数据。我将数据写入的主题是具有多个分区。但是,数据仅写入其中一个分区。当我启动多个消费者控制台时,数据仅打印到其中一个。另一个消费者控制台只有在第一个控制台关闭后才能获取任何数据。我无法弄清楚需要在配置文件中进行哪些更改才能使其写入多个分区。

这是standalone.properties

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084
Run Code Online (Sandbox Code Playgroud)

连接文件源.properties:

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group
Run Code Online (Sandbox Code Playgroud)

现在我使用以下命令来运行连接器:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
Run Code Online (Sandbox Code Playgroud)

使用以下命令启动消费者控制台:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group
Run Code Online (Sandbox Code Playgroud)

它只将数据打印到消费者控制台之一。但是,如果我使用生产者控制台而不是 Kafka 连接来编写消息,那么我可以看到多个消费者上的消息(以循环方式),应该是这样。但是使用Kafka connect,它只是将所有数据写入单个分区,同一组中的其他消费者必须闲置。需要更改什么才能写入循环系统中的所有分区?

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
1819
查看次数

kafka 连接转换器与转换

我正在尝试创建以下工作流程

  1. nginx 日志由 kafka 连接器获取并上传到主题
  2. hdfs 同步连接器然后将这些日志放入 hdfs
  3. Hive 用于对 hdfs 数据进行分析(例如,按 IP 地址分组的访问数量等)

虽然我可以按照 hive Metastore 所需的格式排列 nginx 日志(仅限空格或逗号分隔的必填字段),但我想知道这是否可以在不触及 nginx 日志格式的情况下完成

  1. 使用类似于 org.apache.kafka.connect.json.JsonConverter 的转换器
  2. 使用单个消息转换

这两种方法都需要自定义实现,并且关于如何进行相同操作的文档很少。

哪一种是实现这一目标的正确方法?是否有任何示例可用于解析 nginx 日志输出/任何源数据,同时使用 kafka 连接将其写入主题。我正在使用独立的文件连接器。

nginx apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
1066
查看次数

Kafka:Producer 如何只发送给某个 Consumer/GroupId

自从我开始使用 Kafka,我注意到 Kafka 在每个消费者或组上广播消息。如何让生产者将消息仅发送给某个特定的消费者?我在谈论一个主题系统谢谢

apache-kafka

0
推荐指数
1
解决办法
6830
查看次数

分布式键控/分区/分片Java库

我在我的N fronend机器上收到http请求,并希望由我的K后端机器处理它们,具体取决于数据中的某个键.键控必须稳定和一致.我还想根据负载不间断地扩展前端和后端机器.当缩放时丢失很少的数据时,我很好.

我想我可以用kafka或apache flink实现我的目标.也许也可以使用hazelcast,因为它们看起来都很重,而且对我来说太多了.

是否有一个库只是以分布式方式解决了键控/分区/分片的问题?

rx集成库的加分点.

java sharding distributed-computing hazelcast apache-kafka

-1
推荐指数
1
解决办法
410
查看次数

错误“kafkaconsumer 类型不是通用的,不能用参数参数化”

当我想实例化 KafkaProducer 时,我遇到了一个问题:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
Run Code Online (Sandbox Code Playgroud)

kafkaconsumer 类型不是通用的,它不能用参数参数化

我不明白这个错误,因为我知道我正在关注官方的 kafka javadoc,其中他们做的事情与我的依赖完全相同:

https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

谁能给我解释一下这个笑话?

java apache-kafka

-1
推荐指数
1
解决办法
977
查看次数

错误:无法找到或加载主类文件\kafka_2.12-2.4.0\libs\activation-1.1.1.jar;

错误:无法找到或加载主类文件\kafka_2.12-2.4.0\libs\activation-1.1.1.jar;

apache-kafka

-2
推荐指数
1
解决办法
707
查看次数

Apache Kafka 和 GCP PubSub 之间有什么区别?

Apache Kafka 和 GCP PubSub 之间有什么区别?何时使用kafka,何时使用pubsub。

publish-subscribe apache-kafka google-cloud-pubsub spring-kafka

-2
推荐指数
1
解决办法
4082
查看次数

如何将apache nifi摄取处理器集成/连接到Apache kafka?

我想将数据从Apache nifi的摄取处理器之一推送到Kafka,再推送到HDFS进行存储。

是否可以将Apache nifi的提取处理器与Kafka连接?

apache-kafka apache-nifi

-3
推荐指数
1
解决办法
826
查看次数