标签: confluent-kafka

如何使用.Net中的Confluent.Kafka从特定的TopicPartitionOffset进行消耗

我需要我的消费者从特定的消费TopicPartitionOffset(here from offset 278).假设消息是由某些Producer在特定主题中生成的,就像="Test_1"之前一样.这是我的代码

using System;
using Confluent.Kafka;

public class ConsumingTest
{
    public static void Main(string[] args)
    {
        var consumerConfig = new ConsumerConfig
                                 {
                                     BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
                                 };

        using (var consumer = new Consumer<Null, string>(consumerConfig))
        {
            Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
            consumer.Subscribe("Test_1");

            var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));

            consumer.Assign(topicPartitionOffset);
            consumer.Seek(topicPartitionOffset);

            while (true)
                try
                {
                    var cr = consumer.Consume();

                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e)
                {
                    Console.WriteLine(e.Message);
                } …
Run Code Online (Sandbox Code Playgroud)

.net apache-kafka .net-core kafka-consumer-api confluent-kafka

9
推荐指数
1
解决办法
1260
查看次数

Python librdkafka生产者针对本机Apache Kafka生产者执行

我正在针对Python的confluent-kafka使用本地Java实现对Apache Kafka Producer进行测试,以查看哪个具有最大吞吐量。

我正在使用docker-compose部署一个包含3个Kafka代理和3个zookeeper实例的Kafka集群。我的docker撰写文件:https : //paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw

这是一个非常简单的代码,其中包含Python confluent-kafka的大多数默认选项,并且在Java生产者中进行了一些配置更改,以匹配confluent-kafka的配置。

Python代码:

from confluent_kafka import Producer

producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
    'default.topic.config': {'acks': "all"}})

ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'

def f():
    import time
    start = time.time()
    for i in xrange(1000000):
        try:
            producer.produce('test-topic', ss)
        except Exception:
            producer.poll(1)
            try:
                producer.produce('test-topic', ss)
            except Exception:
                producer.flush(30)
                producer.produce('test-topic', ss)
        producer.poll(0)
    producer.flush(30)
    print(time.time() - start)


if __name__ == '__main__':
    f()
Run Code Online (Sandbox Code Playgroud)

Java实现。配置与librdkafka中的config相同。按照Edenhill的建议更改了linger.ms和回调。

package com.amit.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.Charset;
import java.util.Properties; …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api confluent-kafka librdkafka

8
推荐指数
1
解决办法
455
查看次数

Kafka python API是否支持流处理?

我已经在Java中使用过Kafka Streams。我在python中找不到类似的API。Apache Kafka是否支持python中的流处理?

python apache-kafka kafka-python apache-kafka-streams confluent-kafka

6
推荐指数
2
解决办法
6703
查看次数

Kafka Confluent库中轮询和消费之间的区别

Confluent Kafka库的github示例页面列出了两种方法,即轮询和消耗。两者有什么区别。

我确实在这里的Confluent Kafka库中查看了Consumer的实现,觉得它们在功能上是相同的,只是返回的内容不同。

Poll()调用consumer()来查看是否有准备接收的消息,如果是,则调用OnMessage事件。而使用,将消息保存在其参数之一中,然后返回布尔值。我觉得实现上有所不同,并且在功能上它们是相同的 https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Consumer.cs

c# apache-kafka kafka-consumer-api confluent-kafka

5
推荐指数
1
解决办法
2378
查看次数

在AWS Linux机器上通过yum安装librdkafka1时出现libsasl依赖问题

我正在尝试使用pip安装python confluent-kafka软件包。我正在运行Amazon Linux(版本Amazon Linux AMI版本2016.09)的aws ec2实例上尝试此操作。我只是在做:

pip install pip install confluent-kafka
Run Code Online (Sandbox Code Playgroud)

但是,这会产生以下错误:

In file included from confluent_kafka/src/confluent_kafka.c:17:0:
confluent_kafka/src/confluent_kafka.h:21:32: fatal error: librdkafka/rdkafka.h: No such file or directory
 #include <librdkafka/rdkafka.h>
                                ^
compilation terminated.
error: command 'gcc' failed with exit status 1
Run Code Online (Sandbox Code Playgroud)

为了解决这个问题,我做了两件事:

1)按照此页面上的说明进行操作,并在文件/etc/yum.repos.d/confluent.repo中添加以下内容:

[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.0/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.0/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.0
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.0/archive.key
enabled=1
Run Code Online (Sandbox Code Playgroud)

2)尝试使用以下命令安装librdkafka库:

sudo yum clean all
sudo yum install -y librdkafka1 librdkafka-devel
Run Code Online (Sandbox Code Playgroud)

Yum吐出此错误,但是:

Error: Package: librdkafka1-0.9.1_confluent3.0.1-1.el7.x86_64 (Confluent.dist)
       Requires: openssl-libs
Error: Package: librdkafka1-0.9.1_confluent3.0.1-1.el7.x86_64 (Confluent.dist) …
Run Code Online (Sandbox Code Playgroud)

python linux amazon-ec2 apache-kafka confluent-kafka

2
推荐指数
1
解决办法
2118
查看次数

如何在融合的kafka python中读取批处理消息?

我正在尝试阅读来自Kafka的消息,因此我编写了简单的消费者来阅读来自Kafka的消息.

While True:
        message = consumer.poll(timeout=1.0)
        # i am doing something with messages
Run Code Online (Sandbox Code Playgroud)

在上面的代码输出消息类型是消息对象.我如何获得一组消息?

有可能吗?

注意:基本的消费者配置不多.

python apache-kafka confluent-kafka

2
推荐指数
1
解决办法
1680
查看次数

Kafka(.NET)中的邮件头

如果我理解正确,当前版本的confluent-kafka-dotnet(Confluent.Kafka包,版本0.11.2)不支持发布/阅读带有邮件头的邮件.有没有办法在.NET中使用消息头?有没有机会在不久的将来获得此功能?

.net apache-kafka .net-core confluent-kafka

2
推荐指数
1
解决办法
758
查看次数

调用GetMetadata时禁用自动主题创建

我正在为我的Kafka客户端使用融合golang.我用来AdminClient在kafka集群中创建/删除/获取主题.这是我的初始化代码AdminClient

adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
    "bootstrap.servers": 127.0.0.1:9092,
})
Run Code Online (Sandbox Code Playgroud)

之后,我使用此类创建并获取kafka集群中的所有主题.以下是创建主题的代码:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

results, err := adminClient.CreateTopics(
    ctx,
    []kafka.TopicSpecification{{
        Topic:             topic,
        NumPartitions:     numPartitions,
        ReplicationFactor: replicationFactor}},
    kafka.SetAdminOperationTimeout(TimeOut),
)
Run Code Online (Sandbox Code Playgroud)

之后,我再次获得主题信息:

result, err := adminClient.GetMetadata(&topic, false, 1000)
Run Code Online (Sandbox Code Playgroud)

问题是:如果我得到之前不存在的主题,kafka将自动创建该主题.这是我不想要的行为.请告诉我如何解决这个问题.

go apache-kafka confluent-kafka

1
推荐指数
1
解决办法
149
查看次数

随着请求数量的增加,Go Web服务器的性能急剧下降

我正在基准测试使用Go语言编写的简单Web服务器wrk。服务器正在具有4GB RAM的计算机上运行。在测试开始时,该代码每秒可处理2000个请求,因此性能非常好。但是随着时间的流逝,该进程使用的内存会逐渐增加,一旦达到85%(我正在使用进行检查top),吞吐量就会下降到约100个请求/秒。重新启动服务器后,吞吐量再次增加到最佳数量。

是由于内存问题导致性能下降吗?Go为什么不释放此内存?我的Go服务器如下所示:

func main() {
    defer func() {
        // Wait for all messages to drain out before closing the producer
        p.Flush(1000)
        p.Close()
    }()

    http.HandleFunc("/endpoint", handler)
    log.Fatal(http.ListenAndServe(":8080", nil))
}
Run Code Online (Sandbox Code Playgroud)

在处理程序中,我将传入的Protobuf消息转换为Json并使用融合的Kafka Go库将其写入Kafka。

var p, err = kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
    "message.timeout.ms": "30000",
    "sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
    "sasl.kerberos.principal": "TEST@TEST.ABC.COM",
    "sasl.kerberos.service.name": "kafka",
    "security.protocol": "SASL_PLAINTEXT",
})

var topic = "test"

func handler(w http.ResponseWriter, r *http.Request) {
    body, _ := ioutil.ReadAll(r.Body)

    // Deserialize byte[] to Protobuf message
    protoMessage := &tutorial.REALTIMEGPS{}
    _ := proto.Unmarshal(body, protoMessage) …
Run Code Online (Sandbox Code Playgroud)

webserver go confluent-kafka

-1
推荐指数
1
解决办法
79
查看次数