标签: apache-kafka

更改kafka主题的retention.ms属性会删除旧数据

我正在尝试更改现有主题的retention.ms属性,但根据我的分析,它会删除主题内的所有现有数据。

场景:- 最初我的主题名称的"topic1"保留时间为 5 分钟,并且包含一些数据。

用例 1 - 将保留时间更改为更长的时间然后我使用以下命令将retention.ms值更改为8分钟:-

 bin/kafka-topics.sh --alter --zookeeper localhost:2181 
   --topic topic1 --config retention.ms=8
Run Code Online (Sandbox Code Playgroud)

但它会删除我该主题中的所有旧数据。之后我将更多数据推入该主题。

用例 2 - 将保留时间更改为较短的时间然后我使用以下命令将retention.ms值更改为3分钟:-

 bin/kafka-topics.sh --alter --zookeeper localhost:2181 
   --topic topic1 --config retention.ms=3
Run Code Online (Sandbox Code Playgroud)

它还会删除该主题之前的数据。

有人可以告诉我同样的确切行为吗?

任何提示表示赞赏!

apache-kafka

0
推荐指数
1
解决办法
5711
查看次数

kafka流处理器api全局状态存储多个主题

我正在尝试使用处理器 api 创建 kafka 流,因为我有自定义流程。当我的处理器列出具有不同分区数量的多个主题时,我遇到了问题。我意识到我需要创建一个全球状态商店。我需要知道如何在监听多个主题的同时添加处理器并添加全局状态存储?

任何代码示例或链接都会有所帮助。谢谢你!!

apache-kafka apache-kafka-streams

0
推荐指数
1
解决办法
5799
查看次数

KafkaProducer 循环分配不适用于同一密钥

我试图了解卡夫卡是如何工作的。我读到,默认情况下,Kafka 会在分区之间以循环方式分发来自生产者的消息。

但是,如果消息具有相同的密钥,为什么这些消息总是放在同一个分区中?(未配置分区键策略)。

例如,使用下面的代码,消息始终放置在同一分区中:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String key = properties.getProperty("dev.id");
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), key, value), new EventGeneratorCallback(key));
Run Code Online (Sandbox Code Playgroud)

使用不同的密钥,消息以循环方式分发:

KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String key = properties.getProperty("dev.id") + UUID.randomUUID().toString();
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), key, value), new EventGeneratorCallback(key));
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api

0
推荐指数
1
解决办法
4874
查看次数

如何在Kafka中制作子主题

我试图在Kafka中表示主题子主题。示例:主题“体育”子主题“足球”、“手球”

据我所知,卡夫卡不支持这一点。我现在使用的主题是“Sports_Football”“Sports_Handball”这样的主题...这并不是真正有用的,因为当我们需要时,当我们想要主题“Sports”以及所有子项时,我们需要查询它的所有主题。

我们还使用RedisApache Storm。那么请问有更好的方法吗?

redis apache-kafka apache-storm

0
推荐指数
2
解决办法
5241
查看次数

Spark - 使用 Intellij IDEA 时出现错误“必须在您的配置中设置主 URL”

当我尝试使用 Intellij IDEA 访问 Spark 流应用程序时

环境

Spark核心版本2.2.0 Intellij IDEA 2017.3.5版本

附加信息:Spark 正在 Yarn 模式下运行。

出现错误:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.lang.ExceptionInInitializerError
    at kafka_stream.kafka_stream.main(kafka_stream.scala)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:376)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
    at kafka_stream.InitSpark$class.$init$(InitSpark.scala:15)
    at kafka_stream.kafka_stream$.<init>(kafka_stream.scala:6)
    at kafka_stream.kafka_stream$.<clinit>(kafka_stream.scala)
    ... 1 more

Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)

试过这个

  val spark: SparkSession = SparkSession.builder()
    .appName("SparkStructStream")
    .master("spark://127.0.0.1:7077")
    //.master("local[*]")
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

仍然遇到相同的 MASTER …

scala maven apache-kafka apache-spark

0
推荐指数
1
解决办法
6599
查看次数

ksql,在表上选择不显示任何内容

我创建了一个源主题订阅者,其输入消息如下:

\n\n
{\n  "ip_router": "",\n  "ip_lan": "",\n  "isdn": "2046573688",\n  "end_datetime": "",\n  "shop_code": "1000405100",\n  "reg_type_id": "5131615",\n  "contract_id": "",\n  "update_datetime": "20170801171355",\n  "project": "",\n  "telecom_service_id": "2",\n  "local_speed": "",\n  "password": "",\n  "price_plan": "",\n  "vip": "",\n  "local_price_plan": "",\n  "sub_id": "1083168000",\n  "sta_datetime": "20090511152847",\n  "update_number_1": "1",\n  "act_status": "000",\n  "network_class": "",\n  "limit_usage": "",\n  "num_reset_zone": "",\n  "deposit": "",\n  "create_user": "TUDV_POPBGG",\n  "num_of_computer": "",\n  "cust_id": "10922428129",\n  "status": "2",\n  "active_datetime": "20090511152102",\n  "ip_view": "",\n  "channel_type_id": "",\n  "ip_wan": "",\n  "imsi": "452049760887694",\n  "infrastructure_type": "",\n  "product_code": "HPN03",\n  "expire_datetime": "",\n  "speed": "",\n  "private_ip": "",\n  "update_user": "MIGRATE",\n  "ip_static": …
Run Code Online (Sandbox Code Playgroud)

apache-kafka ksqldb

0
推荐指数
1
解决办法
2257
查看次数

无法启动 Zookeeper 服务器。没有jdk目录

我正在尝试在 UNIX 上启动 kafka,但首先我需要运行 Zookpeer。我有来自https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.0/kafka_2.11-2.0.0.tgz的 kafka 2.12 版本 我运行命令bin/zookeeper-server-start.sh config/zookeeper.properties,但出现错误:

/kafka_2.12-2.0.0/bin/kafka-run-class.sh: line 306: /project/multiar/jdk1.7.0_17_x64/bin/java: No such file or directory
/kafka_2.12-2.0.0/bin/kafka-run-class.sh: line 306: exec: /project/multiar/jdk1.7.0_17_x64/bin/java: cannot execute: No such file or directory.
Run Code Online (Sandbox Code Playgroud)

我的路径中有 Java 8 jdk:

openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
Run Code Online (Sandbox Code Playgroud)

为什么我无法运行zookeper服务器?我应该安装 jdk 7 还是更改.sh文件中的某些内容?

java unix apache-kafka apache-zookeeper

0
推荐指数
1
解决办法
2万
查看次数

Kafka S3 Connector 一次交付保证如何工作

我已经阅读了他们的博客并理解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/

但我正在努力解决我所拥有的这种情况。我目前的配置是:

"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
Run Code Online (Sandbox Code Playgroud)

根据我对配置的了解。连接器将50300000ms(5 分钟)后提交记录文件或文件,以先到者为准。如果连接器将文件上传到 s3 但未能提交到 Kafka,由于我设置了轮换计划间隔,Kafka 如何重新上传将覆盖 s3 文件的相同记录?这不会导致 s3 重复吗?

amazon-s3 apache-kafka apache-kafka-connect confluent-platform

0
推荐指数
1
解决办法
282
查看次数

向现有 Kafka 主题添加分区对生产者/消费者行为的影响

当一个新的分区被添加到一个现有的主题时,生产者和消费者如何适应这种变化,以及将新分区分配给代理的策略是什么?

apache-kafka

0
推荐指数
1
解决办法
897
查看次数

Spring kafka - 未能构建 kafka 消费者

我在使用 gradle 项目启动并运行我的 Kafka/confluent spring boot 时遇到了问题。我最初在这个测试项目中只有一个制作人,一切都运行良好。然后我添加了一个 Kafka 消费者,现在我在启动时遇到了异常。任何人都可以在这里发现问题:

首先这是堆栈跟踪

2021-01-22 19:56:08.566  WARN 61123 --- [           main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573  INFO 61123 --- [           main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575  INFO 61123 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576  INFO 61123 --- [           main] com.zaxxer.hikari.HikariDataSource …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spring-boot spring-kafka

0
推荐指数
1
解决办法
1198
查看次数