我正在使用Kafka和Spring-boot:
卡夫卡制片人班:
@Service
public class MyKafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);
// Send Message
public void sendMessage(String topicName, String message) throws Exception {
LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
}
}); …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
我正在尝试在 BULK 模式下使用具有以下属性的 Kafka Connect JDBC Source Connector。
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=timestamp
connection.password=XXXXX
validate.non.null=false
tasks.max=1
producer.buffer.memory=2097152
batch.size=1000
producer.enable.idempotence=true
offset.flush.timeout.ms=300000
table.types=TABLE,VIEW
table.whitelist=materials
offset.flush.interval.ms=5000
mode=bulk
topic.prefix=mysql-
connection.user=kafka_connect_user
poll.interval.ms=200000
connection.url=jdbc:mysql://<DBNAME>
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
Run Code Online (Sandbox Code Playgroud)
我收到有关提交偏移量的以下错误,更改各种参数似乎影响不大。
[2019-04-04 12:42:14,886] INFO WorkerSourceTask{id=SapMaterialsConnector-0} flushing 4064 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-04-04 12:42:19,886] ERROR WorkerSourceTask{id=SapMaterialsConnector-0} Failed to flush, timed out while waiting for producer to flush outstanding 712 messages (org.apache.kafka.connect.runtime.WorkerSourceTask)
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 SASL Plain 将 Kafka Java 客户端连接到 Kafka 代理。但是当我尝试从生产者发送消息时,Kafka 服务器记录以下错误:
[2020-04-30 14:48:14,955] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
Run Code Online (Sandbox Code Playgroud)
从表面上看,生产者尝试在 SASL 握手之前发送元数据请求。如何在发送消息之前进行握手?
以下是我的kafka_server_jaas.conf文件,用于 Kafka 服务器。
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)
以下是我的zookeeper_jaas.conf文件,用于动物园管理员:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Run Code Online (Sandbox Code Playgroud)
在我的 Java 生产者中,我设置了以下属性:
[2020-04-30 14:48:14,955] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (Unexpected Kafka request of type …Run Code Online (Sandbox Code Playgroud) LEO and HWReplica(Leader Replica)有什么区别?
它们会包含相同的数字吗?我能理解HW就是last committed message offset.
当LEO将更新时如何?
我想在我们的生产环境中使用kafka.我想知道最新版本的客户端是否没有bug用于生产启动.它是否与消费者群体合作?我想每秒传递10000条记录,是否适合它?
我们有一个 Kafka 消费者,它将读取消息并执行这些操作,然后使用以下脚本再次发布到 Kafka 主题
生产者配置:
{
"bootstrap.servers": "localhost:9092"
}
Run Code Online (Sandbox Code Playgroud)
我还没有配置任何其他配置,如 queue.buffering.max.messages queue.buffering.max.ms batch.num.messages
我假设这些都将成为配置中的默认值
queue.buffering.max.messages : 100000
queue.buffering.max.ms : 0
batch.num.messages : 10000
Run Code Online (Sandbox Code Playgroud)
我的理解:当内部队列达到 queue.buffering.max.ms 或 batch.num.messages 中的任何一个时,消息将在单独的线程中发布到 Kafka。在我的配置中 queue.buffering.max.ms 是 0,所以当我调用 generate() 时每条消息都会被发布。如果我错了,请纠正我。
我的制作人片段:
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.flush()
Run Code Online (Sandbox Code Playgroud)
从这篇文章我了解到,在每条消息后使用刷新,生产者将成为同步生产者。如果我使用上面的脚本,发布到 Kafka 需要大约 45 毫秒
如果我将上面的代码段更改为
def send(topic, message):
p.produce(topic, json.dumps(message), callback=delivery_callback(err, msg))
p.poll(0)
Run Code Online (Sandbox Code Playgroud)
有什么性能会提高吗?你能澄清一下我的理解吗?
谢谢
我正在 Ubuntu WSL2 上成功运行最新的 Kafka。我可以在 WSL 上运行的 Ubuntu 中很好地启动 Zookeeper、kafka 服务器、创建主题、控制台生成和控制台消费。但是,当我进入 Windows 上的 Intellij 并创建一个简单的 Java Producer 时,它似乎无法连接到代理
版本和主机名
Java version: 1.8
Kafka Version: 2.6
hostname (from Ubuntu): KDAAPPDEV04
hostname (from Powershell): KDAAPPDEV04
java.net.InetAddress.getLocalHost().getHostName() = KDAAPPDEV04
java.net.InetAddress.getLocalHost().getCanonicalHostName() = KDAAPPDEV04
netstat from CMD:
TCP [::1]:9092 [::]:0 LISTENING
Run Code Online (Sandbox Code Playgroud)
server.properties 我在另一个答案中找到了这个设置,但这些对我不起作用。
advertised.listeners=PLAINTEXT://127.0.0.1:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT
listeners=PLAINTEXT://0.0.0.0:9092
Run Code Online (Sandbox Code Playgroud)
然后尝试(并重新启动zookeeper和kafka)
advertised.listeners=PLAINTEXT://KDAAPPDEV04:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT
listeners=PLAINTEXT://0.0.0.0:9092
Run Code Online (Sandbox Code Playgroud)
制片人
我使用三个不同的值运行这个生产者:主机名、本地主机和 127.0.0.1,但它从未连接到代理
public class ProducerDemo{
private static Logger logger = LoggerFactory.getLogger(ProducerDemo.class);
public static void main(String[] args) throws UnknownHostException{
System.out.println(InetAddress.getLocalHost().getHostName());
System.out.println(InetAddress.getLocalHost().getCanonicalHostName());
String bootstrapServers = "127.0.0.1:9092"; …Run Code Online (Sandbox Code Playgroud) java apache-kafka kafka-producer-api windows-subsystem-for-linux wsl-2
以下在kafka中启用压缩的方法之间有什么区别:
方法1:使用命令创建主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --config compression.type=gzip --topic test
Run Code Online (Sandbox Code Playgroud)
方法2:在Kafka Producer Client API中设置属性compression.type = gzip.
使用方法1时,我获得了更好的压缩和更高的吞吐量.
如果我使用方法1,是否意味着压缩发生在代理端,而在方法2中,消息在生产者端压缩然后发送到代理?
我想知道我们可以在 Kafka 主题中拥有哪些类型的数据。正如我在应用程序级别所知道的,这是一个键值对,这可能是语言支持的类型数据。例如,我们向主题发送一些消息,可以是一些 json、parquet 文件、序列化数据,还是我们只像使用纯文本格式一样处理消息?
谢谢你的帮助。
我得到Message size too large的例外,当我尝试发送邮件是超过1个MB的大小。当我尝试生成消息时,错误出现在我的客户端应用程序中。经过一番谷歌搜索后,我发现应该更改设置以增加最大消息大小。嗯,我在/kafka/config/server.properties文件中做到了。我添加了接下来的 2 个设置:
message.max.bytes=15728640
replica.fetch.max.bytes=15728640
Run Code Online (Sandbox Code Playgroud)
另外,我添加fetch.message.max.bytes=15728640到/kafka/config/consumer.properties文件中。所有其他设置保持默认。
我重新启动了 kafka 服务器,但仍然出现相同的错误。
PS Kafka 版本是 1.1.0。
apache-kafka ×10
java ×2
file-format ×1
python ×1
spring-kafka ×1
windows-subsystem-for-linux ×1
wsl-2 ×1