我正在研究 kafka 流。我想使用选择性非常低的过滤器(几千分之一)过滤我的流。我在看这个方法:https : //kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate)
但是我找不到任何证据,如果过滤器将由消费者评估(我真的不想将大量 GB 转移给消费者,只是为了扔掉它们),或者在经纪人内部(耶!)。
如果它在消费者方面进行评估,有什么办法,如何在经纪人中做到这一点?
谢谢!
我有一个使用 Kafka 1.0 作为队列的应用程序。Kafka 主题有 80 个分区和 80 个正在运行的消费者。(Kafka-python 消费者)。
通过运行命令:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group mygroup --describe
Run Code Online (Sandbox Code Playgroud)
我看到其中一个分区卡在一个偏移处,并且随着新记录的添加,延迟不断增加。
上述命令的输出如下所示:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
118 mytopic 37 1924 2782 858 kafka-python-1.3.4-3da99d4d-63e8-4e72-967e-xxxxxxxxxxx/localhost
119 mytopic 38 2741 2742 1 kafka-python-1.3.4-40b44482-39fc-42d0-8f55-xxxxxxxxxxx/localhost
120 mytopic 39 2713 2713 0 kafka-python-1.3.4-4121d080-1d7c-4d6b-ac58-xxxxxxxxxxx/localhost
121 mytopic 40 2687 2688 1 kafka-python-1.3.4-43441f6e-fd35-448e-b791-xxxxxxxxxxx/localhost
Run Code Online (Sandbox Code Playgroud)
这是什么原因造成的?此外,使用 reset-offsets 命令重置偏移也是不可取的,因为可能不会定期手动监控此服务器。
客户端在 Linux m/c 中作为并行进程在后台运行:
consumer = KafkaConsumer('mytopic', group_id='mygroup', bootstrap_servers='localhost:9092',
session_timeout_ms=120000, heartbeat_interval_ms=100000, max_poll_records=1,
auto_commit_interval_ms=100000, request_timeout_ms=350000, max_partition_fetch_bytes=3*1024*1024,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
msg …Run Code Online (Sandbox Code Playgroud) 对于我的演示应用程序,我必须创建一个 rest 控制器来返回 kafka 队列中的消息。我已经阅读了 spring-kafka 参考指南并实现了消费者配置并创建了如下的 bean
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx");
return props;
}
@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
return new DefaultKafkaConsumerFactory<>( …Run Code Online (Sandbox Code Playgroud) apache-kafka spring-boot kafka-consumer-api spring-web spring-kafka
参考以下消费者组描述的屏幕截图,我试图了解“-”在这里对于 CURRENT-OFFSET 意味着什么。它是否表示即使分区已分配给消费者,也不会从分区 1 和 3 消费消息。分区 1 和 3 的 LOG-END 偏移量分别为 281 和 277。
我需要一些帮助来为我的 Java kafka 使用者构建 Junit 测试用例。
我的原始源代码具有如下方法,并且需要为此创建一个单元测试用例。
public void processConsumerRecord(ConsumerRecords<String, GenericRecord> records, boolean isEventProcessed, boolean isOffsetCommitted,
int totalErrorCountFromSinkService, int totalErrorCount, Consumer<String, GenericRecord> consumer) {
Run Code Online (Sandbox Code Playgroud)
...... }
我的 Kafka 消费者正在从 kafka 主题中提取消息,我需要能够以 ConsumerRecords 格式提供输入消息,但作为单元测试的一部分,我没有轮询来自 kafka 的消息,而是模拟来自原始 kafka 主题的消息并提供测试上述方法的单元测试用例的静态输入消息,如图所示。如何以 ConsumerRecords < String, GenericRecord > 的形式创建模拟输入消息?
有什么方法可以找到 kafka 消费者消费消息的速度吗?像 5000 条消息/秒
当我像这样运行 kafka-console-consumer 时
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
Run Code Online (Sandbox Code Playgroud)
它默认为哪个消费者组?如果我没有在命令行中指定消费者组或链接到消费者属性,它最终会使用随机消费者组吗?如何检查使用了哪个消费者组?
谢谢!
我目前开发了一个代码来显示主题、分区和日志偏移量。但我目前被困在如何获得分区的滞后上。我知道有一个 kafka offset 命令可以完成这个功能,但我需要的是一个 java 代码。
public static void main(String[] args) throws Exception {
System.out.println("START CONSUMER");final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
int i = 0;
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
{
TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
partitions.add(partitiontemp);
}
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
{
System.out.printf("Topic: %s …Run Code Online (Sandbox Code Playgroud) 下面的 Scala kafka 消费者没有从 poll调用中。
但是,主题是正确的,我可以看到使用控制台使用者将事件发送到主题:
/opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning
Run Code Online (Sandbox Code Playgroud)
当我使用调试器逐步完成并调用时,我还在下面的 Scala 代码示例中看到了该主题 kafkaConsumer.listTopics()
此外,这是从单个单元测试中调用的,所以我只创建了这个特征和消费者的一个实例(即另一个消费者实例不能消费消息)。我也在使用随机 group_id。
下面的代码/配置有什么问题吗?
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.util.Random
trait KafkaTest {
val kafkaConsumerProperties = new Properties()
kafkaConsumerProperties.put("bootstrap.servers", "kafka:9092")
kafkaConsumerProperties.put("group.id", Random.alphanumeric.take(10).mkString)
kafkaConsumerProperties.put("key.deserializer", classOf[ByteArrayDeserializer])
kafkaConsumerProperties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](kafkaConsumerProperties)
kafkaConsumer.subscribe(java.util.Collections.singletonList("my_topic"))
def checkKafkaHasReceivedEvent(): Assertion = {
val kafkaEvents = kafkaConsumer.poll(2000) // Always returns 0 events?
...
}
}
Run Code Online (Sandbox Code Playgroud)
增加轮询超时也无济于事。
在我的用例中,我创建了 JDBC kafka 连接器,从 oracle 表中提取数据并成功推送到 kafka 主题,但是当我尝试读取来自这个 kafka 主题的消息时,我遇到了如下所示的反序列化问题。
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: java.net.MalformedURLException: unknown protocol: localhost
at java.net.URL.<init>(URL.java:593)
at java.net.URL.<init>(URL.java:483)
at java.net.URL.<init>(URL.java:432)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:124)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:330)
Run Code Online (Sandbox Code Playgroud) apache-kafka ×10
java ×2
junit ×1
kafka-python ×1
python ×1
scala ×1
spring-boot ×1
spring-kafka ×1
spring-web ×1