我正在尝试更改现有主题的retention.ms属性,但根据我的分析,它会删除主题内的所有现有数据。
场景:- 最初我的主题名称的"topic1"保留时间为 5 分钟,并且包含一些数据。
用例 1 - 将保留时间更改为更长的时间然后我使用以下命令将retention.ms值更改为8分钟:-
bin/kafka-topics.sh --alter --zookeeper localhost:2181
--topic topic1 --config retention.ms=8
Run Code Online (Sandbox Code Playgroud)
但它会删除我该主题中的所有旧数据。之后我将更多数据推入该主题。
用例 2 - 将保留时间更改为较短的时间然后我使用以下命令将retention.ms值更改为3分钟:-
bin/kafka-topics.sh --alter --zookeeper localhost:2181
--topic topic1 --config retention.ms=3
Run Code Online (Sandbox Code Playgroud)
它还会删除该主题之前的数据。
有人可以告诉我同样的确切行为吗?
任何提示表示赞赏!
我正在尝试使用处理器 api 创建 kafka 流,因为我有自定义流程。当我的处理器列出具有不同分区数量的多个主题时,我遇到了问题。我意识到我需要创建一个全球状态商店。我需要知道如何在监听多个主题的同时添加处理器并添加全局状态存储?
任何代码示例或链接都会有所帮助。谢谢你!!
我试图了解卡夫卡是如何工作的。我读到,默认情况下,Kafka 会在分区之间以循环方式分发来自生产者的消息。
但是,如果消息具有相同的密钥,为什么这些消息总是放在同一个分区中?(未配置分区键策略)。
例如,使用下面的代码,消息始终放置在同一分区中:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String key = properties.getProperty("dev.id");
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), key, value), new EventGeneratorCallback(key));
Run Code Online (Sandbox Code Playgroud)
使用不同的密钥,消息以循环方式分发:
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String key = properties.getProperty("dev.id") + UUID.randomUUID().toString();
producer.send(new ProducerRecord<String, String>(properties.getProperty("kafka.topic"), key, value), new EventGeneratorCallback(key));
Run Code Online (Sandbox Code Playgroud) 我试图在Kafka中表示主题和子主题。示例:主题“体育”子主题“足球”、“手球”
据我所知,卡夫卡不支持这一点。我现在使用的主题是“Sports_Football”、“Sports_Handball”这样的主题...这并不是真正有用的,因为当我们需要时,当我们想要主题“Sports”以及所有子项时,我们需要查询它的所有主题。
我们还使用Redis和Apache Storm。那么请问有更好的方法吗?
当我尝试使用 Intellij IDEA 访问 Spark 流应用程序时
环境
Spark核心版本2.2.0 Intellij IDEA 2017.3.5版本
附加信息:Spark 正在 Yarn 模式下运行。
出现错误:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.lang.ExceptionInInitializerError
at kafka_stream.kafka_stream.main(kafka_stream.scala)
Caused by: org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:376)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2509)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:909)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:901)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:901)
at kafka_stream.InitSpark$class.$init$(InitSpark.scala:15)
at kafka_stream.kafka_stream$.<init>(kafka_stream.scala:6)
at kafka_stream.kafka_stream$.<clinit>(kafka_stream.scala)
... 1 more
Process finished with exit code 1
Run Code Online (Sandbox Code Playgroud)
试过这个
val spark: SparkSession = SparkSession.builder()
.appName("SparkStructStream")
.master("spark://127.0.0.1:7077")
//.master("local[*]")
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
仍然遇到相同的 MASTER …
我创建了一个源主题订阅者,其输入消息如下:
\n\n{\n "ip_router": "",\n "ip_lan": "",\n "isdn": "2046573688",\n "end_datetime": "",\n "shop_code": "1000405100",\n "reg_type_id": "5131615",\n "contract_id": "",\n "update_datetime": "20170801171355",\n "project": "",\n "telecom_service_id": "2",\n "local_speed": "",\n "password": "",\n "price_plan": "",\n "vip": "",\n "local_price_plan": "",\n "sub_id": "1083168000",\n "sta_datetime": "20090511152847",\n "update_number_1": "1",\n "act_status": "000",\n "network_class": "",\n "limit_usage": "",\n "num_reset_zone": "",\n "deposit": "",\n "create_user": "TUDV_POPBGG",\n "num_of_computer": "",\n "cust_id": "10922428129",\n "status": "2",\n "active_datetime": "20090511152102",\n "ip_view": "",\n "channel_type_id": "",\n "ip_wan": "",\n "imsi": "452049760887694",\n "infrastructure_type": "",\n "product_code": "HPN03",\n "expire_datetime": "",\n "speed": "",\n "private_ip": "",\n "update_user": "MIGRATE",\n "ip_static": …Run Code Online (Sandbox Code Playgroud) 我正在尝试在 UNIX 上启动 kafka,但首先我需要运行 Zookpeer。我有来自https://www.apache.org/dyn/closer.cgi?path=/kafka/2.0.0/kafka_2.11-2.0.0.tgz的 kafka 2.12 版本
我运行命令bin/zookeeper-server-start.sh config/zookeeper.properties,但出现错误:
/kafka_2.12-2.0.0/bin/kafka-run-class.sh: line 306: /project/multiar/jdk1.7.0_17_x64/bin/java: No such file or directory
/kafka_2.12-2.0.0/bin/kafka-run-class.sh: line 306: exec: /project/multiar/jdk1.7.0_17_x64/bin/java: cannot execute: No such file or directory.
Run Code Online (Sandbox Code Playgroud)
我的路径中有 Java 8 jdk:
openjdk version "1.8.0_161"
OpenJDK Runtime Environment (build 1.8.0_161-b14)
OpenJDK 64-Bit Server VM (build 25.161-b14, mixed mode)
Run Code Online (Sandbox Code Playgroud)
为什么我无法运行zookeper服务器?我应该安装 jdk 7 还是更改.sh文件中的某些内容?
我已经阅读了他们的博客并理解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
但我正在努力解决我所拥有的这种情况。我目前的配置是:
"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
Run Code Online (Sandbox Code Playgroud)
根据我对配置的了解。连接器将50在300000ms(5 分钟)后提交记录文件或文件,以先到者为准。如果连接器将文件上传到 s3 但未能提交到 Kafka,由于我设置了轮换计划间隔,Kafka 如何重新上传将覆盖 s3 文件的相同记录?这不会导致 s3 重复吗?
amazon-s3 apache-kafka apache-kafka-connect confluent-platform
当一个新的分区被添加到一个现有的主题时,生产者和消费者如何适应这种变化,以及将新分区分配给代理的策略是什么?
我在使用 gradle 项目启动并运行我的 Kafka/confluent spring boot 时遇到了问题。我最初在这个测试项目中只有一个制作人,一切都运行良好。然后我添加了一个 Kafka 消费者,现在我在启动时遇到了异常。任何人都可以在这里发现问题:
首先这是堆栈跟踪
2021-01-22 19:56:08.566 WARN 61123 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573 INFO 61123 --- [ main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575 INFO 61123 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576 INFO 61123 --- [ main] com.zaxxer.hikari.HikariDataSource …Run Code Online (Sandbox Code Playgroud) apache-kafka ×10
java ×2
amazon-s3 ×1
apache-spark ×1
apache-storm ×1
ksqldb ×1
maven ×1
redis ×1
scala ×1
spring-boot ×1
spring-kafka ×1
unix ×1