我刚刚开始使用 Kafka 并遇到以下菜鸟错误:
'Value cannot be null.
Parameter name: Value serializer not specified and there is no default serializer defined for type ActMessage.'
Run Code Online (Sandbox Code Playgroud)
当尝试发送类对象、ActMessage 对象而不是示例附带的简单字符串时,会发生这种情况。引发错误的代码行是:
using (var p = new ProducerBuilder<Null, ActMessage>(config ).Build()
Run Code Online (Sandbox Code Playgroud)
我正在使用.net 客户端。
我的理解是,我需要在第一个类型参数中使用默认序列化之一,这是 Kafka 客户端附带的一个,如此处所述,但无法在此 .net 包中找到它们。我想我可以建造一个,但这会浪费时间。
这是一个可重现的示例:
public class ActMessage {
public int SomeId {get;set;}
public string SomeContent {get;set;}
}
class Tester {
void send(){
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using (var p = new ProducerBuilder<Null, ActMessage>(config).Build()) //throws …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 AWS 托管 Kafka 实例 (MSK) 创建 Kafka 客户端应用程序(生产者和消费者)。此外,代理到代理的通信以及客户端到代理的通信通过集群中的 TLS 配置为安全。CA 是 AWS 私有 CA,因为这是 MSK 支持的唯一一种通过 TLS 进行客户端到代理身份验证的 CA。
问题上下文:AWS官方文档(https://docs.aws.amazon.com/msk/latest/developerguide/msk-authentication.html#msk-authentication-client)步骤更倾向于Java世界并处理客户端信任库和密钥库为 jks。但是.Net客户端不使用Java JKS容器格式(https://github.com/mhowlett/confluence-kafka-dotnet/tree/security/examples/Security)。
服务器验证客户端:这部分我能够解决。由于 jks 只是一个数据存储,因此在按照上述 aws 文档创建密钥库后,我运行了一些额外的 keytool 和 openssl 命令来显式提取客户端证书和密钥。我能够使用它成功地生成和使用消息。
但是,为了让客户端验证服务器,我需要将 ssl.ca.location 设置为 CA 根证书。从私有 CA(用作 MSK 实例的 CA)我已经下载了根 CA,默认情况下为 pem 格式(注意:这仅包含证书详细信息,不包含密钥详细信息)。使用以下命令将其转换为 .crt:
add pem to truststore : keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file Certificate.pem
get cert from truststore : keytool -export -alias CARoot -keystore kafka.client.truststore.jks -rfc -file ca-root.crt
使用上面的 ca-root.crt 作为 ca …
我对 AWS 很陌生,对 Kafka 也很陌生(使用 Confluence 平台和 .NET)。
我们将接收大文件 (~1-40+Mb) 到我们的 S3 存储桶,并且其消费端应该处理这些文件。我们将通过 Kafka 发送所有消息。
我读过你不应该通过 Kafka 发送大文件,但也许我在这里被误导了?
如果我们只想获取一个新文件已到达我们的 S3 存储桶的事件(当然还有对其的某种引用),我们该怎么办?
我在最近的 C# 项目中使用 Confluence kafka 包。我通过以下方式创建了一个生产者:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
Run Code Online (Sandbox Code Playgroud)
但问题是我的一些信息没有传达给消费者。他们在某个地方迷路了。但是,如果我对生产者使用await ,那么所有消息都会被传递。如何无需等待即可传递我的所有消息。(我有一个分区)
.net c# apache-kafka kafka-producer-api confluent-kafka-dotnet