我需要我的消费者从特定的消费TopicPartitionOffset(here from offset 278).假设消息是由某些Producer在特定主题中生成的,就像="Test_1"之前一样.这是我的代码
using System;
using Confluent.Kafka;
public class ConsumingTest
{
public static void Main(string[] args)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = "localhost:9092", EnableAutoCommit = false, GroupId = "this-group-id"
};
using (var consumer = new Consumer<Null, string>(consumerConfig))
{
Console.WriteLine("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~Consume Started...");
consumer.Subscribe("Test_1");
var topicPartitionOffset = new TopicPartitionOffset("Test_1", new Partition(0), new Offset(278));
consumer.Assign(topicPartitionOffset);
consumer.Seek(topicPartitionOffset);
while (true)
try
{
var cr = consumer.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine(e.Message);
} …Run Code Online (Sandbox Code Playgroud) .net apache-kafka .net-core kafka-consumer-api confluent-kafka
我正在针对Python的confluent-kafka使用本地Java实现对Apache Kafka Producer进行测试,以查看哪个具有最大吞吐量。
我正在使用docker-compose部署一个包含3个Kafka代理和3个zookeeper实例的Kafka集群。我的docker撰写文件:https : //paste.fedoraproject.org/paste/bn7rr2~YRuIihZ06O3Q6vw/raw
这是一个非常简单的代码,其中包含Python confluent-kafka的大多数默认选项,并且在Java生产者中进行了一些配置更改,以匹配confluent-kafka的配置。
Python代码:
from confluent_kafka import Producer
producer = Producer({'bootstrap.servers': 'kafka-1:19092,kafka-2:29092,kafka-3:39092', 'linger.ms': 300, "max.in.flight.requests.per.connection": 1000000, "queue.buffering.max.kbytes": 1048576, "message.max.bytes": 1000000,
'default.topic.config': {'acks': "all"}})
ss = '0123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357'
def f():
import time
start = time.time()
for i in xrange(1000000):
try:
producer.produce('test-topic', ss)
except Exception:
producer.poll(1)
try:
producer.produce('test-topic', ss)
except Exception:
producer.flush(30)
producer.produce('test-topic', ss)
producer.poll(0)
producer.flush(30)
print(time.time() - start)
if __name__ == '__main__':
f()
Run Code Online (Sandbox Code Playgroud)
Java实现。配置与librdkafka中的config相同。按照Edenhill的建议更改了linger.ms和回调。
package com.amit.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.nio.charset.Charset;
import java.util.Properties; …Run Code Online (Sandbox Code Playgroud) 我已经在Java中使用过Kafka Streams。我在python中找不到类似的API。Apache Kafka是否支持python中的流处理?
python apache-kafka kafka-python apache-kafka-streams confluent-kafka
Confluent Kafka库的github示例页面列出了两种方法,即轮询和消耗。两者有什么区别。
我确实在这里的Confluent Kafka库中查看了Consumer的实现,觉得它们在功能上是相同的,只是返回的内容不同。
Poll()调用consumer()来查看是否有准备接收的消息,如果是,则调用OnMessage事件。而使用,将消息保存在其参数之一中,然后返回布尔值。我觉得实现上有所不同,并且在功能上它们是相同的 https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Consumer.cs
我正在尝试使用pip安装python confluent-kafka软件包。我正在运行Amazon Linux(版本Amazon Linux AMI版本2016.09)的aws ec2实例上尝试此操作。我只是在做:
pip install pip install confluent-kafka
Run Code Online (Sandbox Code Playgroud)
但是,这会产生以下错误:
In file included from confluent_kafka/src/confluent_kafka.c:17:0:
confluent_kafka/src/confluent_kafka.h:21:32: fatal error: librdkafka/rdkafka.h: No such file or directory
#include <librdkafka/rdkafka.h>
^
compilation terminated.
error: command 'gcc' failed with exit status 1
Run Code Online (Sandbox Code Playgroud)
为了解决这个问题,我做了两件事:
1)按照此页面上的说明进行操作,并在文件/etc/yum.repos.d/confluent.repo中添加以下内容:
[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.0/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.0/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.0
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.0/archive.key
enabled=1
Run Code Online (Sandbox Code Playgroud)
2)尝试使用以下命令安装librdkafka库:
sudo yum clean all
sudo yum install -y librdkafka1 librdkafka-devel
Run Code Online (Sandbox Code Playgroud)
Yum吐出此错误,但是:
Error: Package: librdkafka1-0.9.1_confluent3.0.1-1.el7.x86_64 (Confluent.dist)
Requires: openssl-libs
Error: Package: librdkafka1-0.9.1_confluent3.0.1-1.el7.x86_64 (Confluent.dist) …Run Code Online (Sandbox Code Playgroud) 我正在尝试阅读来自Kafka的消息,因此我编写了简单的消费者来阅读来自Kafka的消息.
While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages
Run Code Online (Sandbox Code Playgroud)
在上面的代码输出消息类型是消息对象.我如何获得一组消息?
有可能吗?
注意:基本的消费者配置不多.
如果我理解正确,当前版本的confluent-kafka-dotnet(Confluent.Kafka包,版本0.11.2)不支持发布/阅读带有邮件头的邮件.有没有办法在.NET中使用消息头?有没有机会在不久的将来获得此功能?
我正在为我的Kafka客户端使用融合golang.我用来AdminClient在kafka集群中创建/删除/获取主题.这是我的初始化代码AdminClient
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": 127.0.0.1:9092,
})
Run Code Online (Sandbox Code Playgroud)
之后,我使用此类创建并获取kafka集群中的所有主题.以下是创建主题的代码:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results, err := adminClient.CreateTopics(
ctx,
[]kafka.TopicSpecification{{
Topic: topic,
NumPartitions: numPartitions,
ReplicationFactor: replicationFactor}},
kafka.SetAdminOperationTimeout(TimeOut),
)
Run Code Online (Sandbox Code Playgroud)
之后,我再次获得主题信息:
result, err := adminClient.GetMetadata(&topic, false, 1000)
Run Code Online (Sandbox Code Playgroud)
问题是:如果我得到之前不存在的主题,kafka将自动创建该主题.这是我不想要的行为.请告诉我如何解决这个问题.
我正在基准测试使用Go语言编写的简单Web服务器wrk。服务器正在具有4GB RAM的计算机上运行。在测试开始时,该代码每秒可处理2000个请求,因此性能非常好。但是随着时间的流逝,该进程使用的内存会逐渐增加,一旦达到85%(我正在使用进行检查top),吞吐量就会下降到约100个请求/秒。重新启动服务器后,吞吐量再次增加到最佳数量。
是由于内存问题导致性能下降吗?Go为什么不释放此内存?我的Go服务器如下所示:
func main() {
defer func() {
// Wait for all messages to drain out before closing the producer
p.Flush(1000)
p.Close()
}()
http.HandleFunc("/endpoint", handler)
log.Fatal(http.ListenAndServe(":8080", nil))
}
Run Code Online (Sandbox Code Playgroud)
在处理程序中,我将传入的Protobuf消息转换为Json并使用融合的Kafka Go库将其写入Kafka。
var p, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "abc-0.com:6667,abc-1.com:6667",
"message.timeout.ms": "30000",
"sasl.kerberos.keytab": "/opt/certs/TEST.KEYTAB",
"sasl.kerberos.principal": "TEST@TEST.ABC.COM",
"sasl.kerberos.service.name": "kafka",
"security.protocol": "SASL_PLAINTEXT",
})
var topic = "test"
func handler(w http.ResponseWriter, r *http.Request) {
body, _ := ioutil.ReadAll(r.Body)
// Deserialize byte[] to Protobuf message
protoMessage := &tutorial.REALTIMEGPS{}
_ := proto.Unmarshal(body, protoMessage) …Run Code Online (Sandbox Code Playgroud) apache-kafka ×8
python ×3
.net ×2
.net-core ×2
go ×2
amazon-ec2 ×1
c# ×1
kafka-python ×1
librdkafka ×1
linux ×1
webserver ×1