我是Kafka的新用户,现在已经试用了大约2-3周.我相信目前我已经很好地理解了Kafka在大多数情况下的工作原理,但是在尝试为我自己的Kafka消费者设计API之后(这是模糊不清的但是我遵循了新的KafkaConsumer应该遵循的准则可用于v 0.9,它出现在'trunk'repo atm上)如果我有多个具有相同groupID的消费者,我就会从主题中消耗延迟问题.
在此设置中,我的控制台始终记录有关"重新平衡触发"的问题.当我向消费者群体添加新的消费者时,是否会发生重新平衡,并且为了找出同一个群组ID中的哪个消费者实例将获得哪些分区或完全用于其他内容的重新平衡而触发它们?
我也从https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design中看到了这段话,我似乎无法理解它,所以如果有人能帮助我做感觉非常感激:
重新平衡是一组消费者实例(属于同一组)协调以拥有该组订阅的互斥主题分区集的过程.在成功完成消费者组的重新平衡操作结束时,所有订阅主题的每个分区都将由该组中的单个消费者实例拥有.重新平衡的工作方式如下.每个经纪人都被选为消费者群体子集的协调者.组的协调代理负责协调有关订阅主题的使用者组成员身份更改或分区更改的重新平衡操作.它还负责将生成的分区所有权配置传递给正在进行重新平衡操作的组的所有使用者.
我是一名学习卡夫卡的新生,我遇到了一些基本问题,理解了多个消费者,文章,文件等对目前来说并没有太大的帮助.
我试图做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,向主题发布100条简单消息并让我的消费者检索它们.我已经成功地做到了这一点,但是当我尝试引入第二个消费者来消费刚刚发布消息的同一主题时,它不会收到任何消息.
我的理解是,对于每个主题,您可以拥有来自不同消费者群体的消费者,并且每个消费者群体都可以获得针对某个主题生成的消息的完整副本.它是否正确?如果没有,那么建立多个消费者的正确方法是什么?这是我到目前为止写的消费者类:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) { …Run Code Online (Sandbox Code Playgroud) 我运行了该命令sudo lsof -i -n -P | grep TCP,想知道是否可以获得有关其输出的更多说明。
具体来说,在这张图片中:

为什么我有一个 IP:PORT 指向另一个 IP:PORT,然后又返回到自身并带有标签“ESTABLISHED”?我对这到底意味着什么感到困惑。
我有一个很大的镶木地板数据集,我正在用 Spark 阅读。读取后,我过滤了在应用不同转换的许多函数中使用的行子集:
以下与我要完成的工作类似但不完全符合逻辑:
df = spark.read.parquet(file)
special_rows = df.filter(col('special') > 0)
# Thinking about adding the following line
special_rows.cache()
def f1(df):
new_df_1 = df.withColumn('foo', lit(0))
return new_df_1
def f2(df):
new_df_2 = df.withColumn('foo', lit(1))
return new_df_2
new_df_1 = f1(special_rows)
new_df_2 = f2(special_rows)
output_df = new_df_1.union(new_df_2)
output_df.write.parquet(location)
Run Code Online (Sandbox Code Playgroud)
因为许多函数可能正在使用这个过滤的行子集,我想缓存或持久化它,以便潜在地加快执行速度/内存消耗。我知道在上面的例子中,在我最终写入parquet.
我的问题是,我需要插入某种调用的count(),例如,以触发高速缓存,或者如果拼花调用,最终写在星火就能看到这个数据帧正在被使用f1,并f2和意志缓存数据帧本身。
如果是,这是一种惯用的方法吗?这是否意味着在依赖缓存的生产和大规模 Spark 作业中,经常使用随机操作来抢先对数据帧执行操作,例如调用count?