标签: kafka-producer-api

Kafka制作人TimeoutException:过期1条记录

我正在使用Kafka和Spring-boot:

卡夫卡制片人班:

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        }); …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka

15
推荐指数
1
解决办法
2万
查看次数

Kafka Connect - 无法刷新,在等待生产者刷新未完成的消息时超时

我正在尝试在 BULK 模式下使用具有以下属性的 Kafka Connect JDBC Source Connector。

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Run Code Online (Sandbox Code Playgroud)

我收到有关提交偏移量的以下错误,更改各种参数似乎影响不大。

[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api apache-kafka-connect

15
推荐指数
1
解决办法
1万
查看次数

SASL 握手期间出现意外的 METADATA 类型的 Kafka 请求

我正在尝试使用 SASL Plain 将 Kafka Java 客户端连接到 Kafka 代理。但是当我尝试从生产者发送消息时,Kafka 服务器记录以下错误:

[2020-04-30 14:48:14,955] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
Run Code Online (Sandbox Code Playgroud)

从表面上看,生产者尝试在 SASL 握手之前发送元数据请求。如何在发送消息之前进行握手?

以下是我的kafka_server_jaas.conf文件,用于 Kafka 服务器。

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
};

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)

以下是我的zookeeper_jaas.conf文件,用于动物园管理员:

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)

在我的 Java 生产者中,我设置了以下属性:

[2020-04-30 14:48:14,955] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api

15
推荐指数
1
解决办法
2万
查看次数

Kafka - 日志结束偏移(LEO)与高水位(HW)之间的差异

LEO and HWReplica(Leader Replica)有什么区别?

它们会包含相同的数字吗?我能理解HW就是last committed message offset.

当LEO将更新时如何?

apache-kafka kafka-consumer-api kafka-producer-api

14
推荐指数
2
解决办法
8820
查看次数

Kafka与.Net客户端

我想在我们的生产环境中使用kafka.我想知道最新版本的客户端是否没有bug用于生产启动.它是否与消费者群体合作?我想每秒传递10000条记录,是否适合它?

apache-kafka kafka-producer-api

13
推荐指数
3
解决办法
2万
查看次数

卡夫卡生产者刷新和轮询之间的区别

我们有一个 Kafka 消费者,它将读取消息并执行这些操作,然后使用以下脚本再次发布到 Kafka 主题

生产者配置:

{
  "bootstrap.servers": "localhost:9092"
}
Run Code Online (Sandbox Code Playgroud)

我还没有配置任何其他配置,如 queue.buffering.max.messages queue.buffering.max.ms batch.num.messages

我假设这些都将成为配置中的默认值

queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
Run Code Online (Sandbox Code Playgroud)

我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用 generate() 时每条消息都会被发布。如果我错了,请纠正我。

我的制作人片段:

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.flush()
Run Code Online (Sandbox Code Playgroud)

这篇文章我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 需要大约 45 毫秒

如果我将上面的代码段更改为

def send(topic, message):
    p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
    p.poll(0)
Run Code Online (Sandbox Code Playgroud)

有什么性能会提高吗?你能澄清一下我的理解吗?

谢谢

python apache-kafka kafka-producer-api confluent-platform

13
推荐指数
1
解决办法
2万
查看次数

无法从 Windows 生成在 WSL 2 上运行的 Kafka 主题

我正在 Ubuntu WSL2 上成功运行最新的 Kafka。我可以在 WSL 上运行的 Ubuntu 中很好地启动 Zookeeper、kafka 服务器、创建主题、控制台生成和控制台消费。但是,当我进入 Windows 上的 Intellij 并创建一个简单的 Java Producer 时,它似乎无法连接到代理

版本和主机名

    Java version: 1.8
    Kafka Version: 2.6
    hostname (from Ubuntu): KDAAPPDEV04
    hostname (from Powershell): KDAAPPDEV04
    java.net.InetAddress.getLocalHost().getHostName() = KDAAPPDEV04
    java.net.InetAddress.getLocalHost().getCanonicalHostName() = KDAAPPDEV04
    netstat from CMD:
        TCP    [::1]:9092             [::]:0                 LISTENING
Run Code Online (Sandbox Code Playgroud)

server.properties 我在另一个答案中找到了这个设置,但这些对我不起作用。

advertised.listeners=PLAINTEXT://127.0.0.1:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT
listeners=PLAINTEXT://0.0.0.0:9092
Run Code Online (Sandbox Code Playgroud)

然后尝试(并重新启动zookeeper和kafka)

advertised.listeners=PLAINTEXT://KDAAPPDEV04:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT
listeners=PLAINTEXT://0.0.0.0:9092
Run Code Online (Sandbox Code Playgroud)

制片人

我使用三个不同的值运行这个生产者:主机名、本地主机和 127.0.0.1,但它从未连接到代理

    public class ProducerDemo{

    private static Logger logger = LoggerFactory.getLogger(ProducerDemo.class);

    public static void main(String[] args) throws UnknownHostException{

        System.out.println(InetAddress.getLocalHost().getHostName());
        System.out.println(InetAddress.getLocalHost().getCanonicalHostName());

        String bootstrapServers = "127.0.0.1:9092"; …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka kafka-producer-api windows-subsystem-for-linux wsl-2

13
推荐指数
3
解决办法
8981
查看次数

Kafka在制作人和主题上设置了压缩类型

以下在kafka中启用压缩的方法之间有什么区别:

方法1:使用命令创建主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --config compression.type=gzip --topic test
Run Code Online (Sandbox Code Playgroud)

方法2:在Kafka Producer Client API中设置属性compression.type = gzip.

使用方法1时,我获得了更好的压缩和更高的吞吐量.

如果我使用方法1,是否意味着压缩发生在代理端,而在方法2中,消息在生产者端压缩然后发送到代理?

apache-kafka kafka-producer-api

12
推荐指数
1
解决办法
8963
查看次数

Kafka 消息的数据类型

我想知道我们可以在 Kafka 主题中拥有哪些类型的数据。正如我在应用程序级别所知道的,这是一个键值对,这可能是语言支持的类型数据。例如,我们向主题发送一些消息,可以是一些 json、parquet 文件、序列化数据,还是我们只像使用纯文本格式一样处理消息?

谢谢你的帮助。

file-format apache-kafka kafka-producer-api

11
推荐指数
1
解决办法
2万
查看次数

Kafka - 代理:消息大小太大

我得到Message size too large的例外,当我尝试发送邮件是超过1个MB的大小。当我尝试生成消息时,错误出现在我的客户端应用程序中。经过一番谷歌搜索后,我发现应该更改设置以增加最大消息大小。嗯,我在/kafka/config/server.properties文件中做到了。我添加了接下来的 2 个设置:

message.max.bytes=15728640
replica.fetch.max.bytes=15728640
Run Code Online (Sandbox Code Playgroud)

另外,我添加fetch.message.max.bytes=15728640/kafka/config/consumer.properties文件中。所有其他设置保持默认。

我重新启动了 kafka 服务器,但仍然出现相同的错误。

PS Kafka 版本是 1.1.0。

apache-kafka kafka-producer-api

11
推荐指数
1
解决办法
1万
查看次数