Spring Boot 和 Kafka,生产者抛出 key='null' 异常

tsa*_*txt 7 apache-kafka docker spring-boot

我正在尝试使用Spring BootwithKafkaZooKeeperwithDocker

\n\n

docker-compose.yml:

\n\n
version: \'2\'\n\nservices:\n\nzookeeper:\n image: wurstmeister/zookeeper\n restart: always\n ports:\n   - "2181:2181"\n\nkafka:\n image: wurstmeister/kafka\n restart: always\n ports:\n   - "9092:9092"\n environment:\n   KAFKA_ADVERTISED_HOST_NAME: localhost\n   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181\n
Run Code Online (Sandbox Code Playgroud)\n\n

docker ps输出:

\n\n
CONTAINER ID        IMAGE                    COMMAND                            CREATED             STATUS              PORTS                                                        NAMES\n980e6b09f4e3        wurstmeister/kafka       "start-kafka.sh"         29 minutes ago      Up 29 minutes       0.0.0.0:9092->9092/tcp                               samplespringkafkaproducerconsumermaster_kafka_1\n64519d4808aa        wurstmeister/zookeeper   "/bin/sh -c \'/usr/sb\xe2\x80\xa6"   2 hours ago         Up 29 minutes       22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   samplespringkafkaproducerconsumermaster_zookeeper_1\n
Run Code Online (Sandbox Code Playgroud)\n\n

docker-compose up输出日志:

\n\n
kafka_1      | [2018-01-12 13:14:49,545] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,546] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,546] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,547] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,547] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,548] INFO Client environment:os.version=4.9.60-linuxkit-aufs (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,548] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,549] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,549] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,552] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)\nkafka_1      | [2018-01-12 13:14:49,574] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)\nkafka_1      | [2018-01-12 13:14:49,578] INFO Opening socket connection to server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)\nzookeeper_1  | 2018-01-12 13:14:49,591 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.32.3:51466\nkafka_1      | [2018-01-12 13:14:49,593] INFO Socket connection established to samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, initiating session (org.apache.zookeeper.ClientCnxn)\nzookeeper_1  | 2018-01-12 13:14:49,600 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /192.168.32.3:51466\nzookeeper_1  | 2018-01-12 13:14:49,603 [myid:] - INFO  [SyncThread:0:FileTxnLog@203] - Creating new log file: log.fd\nzookeeper_1  | 2018-01-12 13:14:49,613 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@673] - Established session 0x160ea8232b00000 with negotiated timeout 6000 for client /192.168.32.3:51466\nkafka_1      | [2018-01-12 13:14:49,616] INFO Session establishment complete on server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, sessionid = 0x160ea8232b00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)\nkafka_1      | [2018-01-12 13:14:49,619] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)\nkafka_1      | [2018-01-12 13:14:49,992] INFO Cluster ID = Fgy9ybPPQQ-QdLINzHpmVA (kafka.server.KafkaServer)\nkafka_1      | [2018-01-12 13:14:50,003] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)\nkafka_1      | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)\nkafka_1      | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)\nkafka_1      | [2018-01-12 13:14:50,067] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)\nkafka_1      | [2018-01-12 13:14:50,167] INFO Log directory \'/kafka/kafka-logs-980e6b09f4e3\' not found, creating it. (kafka.log.LogManager)\nkafka_1      | [2018-01-12 13:14:50,183] INFO Loading logs. (kafka.log.LogManager)\nkafka_1      | [2018-01-12 13:14:50,199] INFO Logs loading complete in 15 ms. (kafka.log.LogManager)\nkafka_1      | [2018-01-12 13:14:50,283] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)\nkafka_1      | [2018-01-12 13:14:50,291] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)\nkafka_1      | [2018-01-12 13:14:50,633] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)\nkafka_1      | [2018-01-12 13:14:50,639] INFO [SocketServer brokerId=1005] Started 1 acceptor threads (kafka.network.SocketServer)\nkafka_1      | [2018-01-12 13:14:50,673] INFO [ExpirationReaper-1005-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1      | [2018-01-12 13:14:50,674] INFO [ExpirationReaper-1005-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1      | [2018-01-12 13:14:50,675] INFO [ExpirationReaper-1005-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1      | [2018-01-12 13:14:50,691] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)\nkafka_1      | [2018-01-12 13:14:50,753] INFO [ExpirationReaper-1005-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1      | [2018-01-12 13:14:50,757] INFO [ExpirationReaper-1005-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1      | [2018-01-12 13:14:50,762] INFO [ExpirationReaper-1005-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1      | [2018-01-12 13:14:50,777] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)\nkafka_1      | [2018-01-12 13:14:50,791] INFO [GroupCoordinator 1005]: Starting up. (kafka.coordinator.group.GroupCoordinator)\nkafka_1      | [2018-01-12 13:14:50,791] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)\nkafka_1      | [2018-01-12 13:14:50,793] INFO [GroupCoordinator 1005]: Startup complete. (kafka.coordinator.group.GroupCoordinator)\nkafka_1      | [2018-01-12 13:14:50,798] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)\nkafka_1      | [2018-01-12 13:14:50,811] INFO [ProducerId Manager 1005]: Acquired new producerId block (brokerId:1005,blockStartProducerId:5000,blockEndProducerId:5999) by writing to Zk with path version 6 (kafka.coordinator.transaction.ProducerIdManager)\nkafka_1      | [2018-01-12 13:14:50,848] INFO [TransactionCoordinator id=1005] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)\nkafka_1      | [2018-01-12 13:14:50,850] INFO [Transaction Marker Channel Manager 1005]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)\nkafka_1      | [2018-01-12 13:14:50,850] INFO [TransactionCoordinator id=1005] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)\nkafka_1      | [2018-01-12 13:14:50,949] INFO Creating /brokers/ids/1005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)\nzookeeper_1  | 2018-01-12 13:14:50,952 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x70 zxid:0x102 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers\nzookeeper_1  | 2018-01-12 13:14:50,952 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x71 zxid:0x103 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids\nkafka_1      | [2018-01-12 13:14:50,957] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)\nkafka_1      | [2018-01-12 13:14:50,959] INFO Registered broker 1005 at path /brokers/ids/1005 with addresses: EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)\nkafka_1      | [2018-01-12 13:14:50,961] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)\nkafka_1      | [2018-01-12 13:14:50,992] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)\nkafka_1      | [2018-01-12 13:14:50,993] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)\nkafka_1      | [2018-01-12 13:14:51,004] INFO [KafkaServer id=1005] started (kafka.server.KafkaServer)\nzookeeper_1  | 2018-01-12 13:14:51,263 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:delete cxid:0xe3 zxid:0x105 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election\nkafka_1      | [2018-01-12 13:24:50,793] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)\nkafka_1      | [2018-01-12 13:34:50,795] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)\n
Run Code Online (Sandbox Code Playgroud)\n\n

KafkaMaven 依赖项ProducerConsumer

\n\n
 <parent>\n     <groupId>org.springframework.boot</groupId>\n     <artifactId>spring-boot-starter-parent</artifactId>\n     <version>1.5.9.RELEASE</version>\n     <relativePath/>\n </parent>\n\n <dependency>\n     <groupId>org.springframework.kafka</groupId>\n     <artifactId>spring-kafka</artifactId>\n </dependency>\n
Run Code Online (Sandbox Code Playgroud)\n\n

application.propertiesProducer

\n\n
 spring.kafka.producer.bootstrap-servers=0.0.0.0:9092\n\n spring.kafka.consumer.topic=kafka_topic\n server.port=8080\n
Run Code Online (Sandbox Code Playgroud)\n\n

application.propertiesConsumer

\n\n
 spring.kafka.consumer.bootstrap-servers=0.0.0.0:9092\n spring.kafka.consumer.group-id=WorkUnitApp\n\n spring.kafka.consumer.topic=kafka_topic\n server.port=8081\n
Run Code Online (Sandbox Code Playgroud)\n\n

Consumer:

\n\n
@Component \npublic class Consumer {\n\nprivate static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);\n\n@KafkaListener(topics = "${spring.kafka.consumer.topic}")\n public void receive(ConsumerRecord<?, ?> consumerRecord) {\n LOGGER.info("received payload=\'{}\'", consumerRecord.toString());\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

Producer:

\n\n
@Component\npublic class Producer {\n\nprivate static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);\n\n@Autowired\nprivate KafkaTemplate<String, String> kafkaTemplate;\n\npublic void send(String topic, String payload) {\n LOGGER.info("sending payload=\'{}\' to topic=\'{}\'", payload, topic);\n kafkaTemplate.send(topic, payload);\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

ConsumerConfig日志:

\n\n
2018-01-12 15:25:48.220  INFO 20919 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: \nauto.commit.interval.ms = 5000\nauto.offset.reset = latest\nbootstrap.servers = [0.0.0.0:9092]\ncheck.crcs = true\nclient.id = consumer-1\nconnections.max.idle.ms = 540000\nenable.auto.commit = true\nexclude.internal.topics = true\nfetch.max.bytes = 52428800\nfetch.max.wait.ms = 500\nfetch.min.bytes = 1\ngroup.id = WorkUnitApp\nheartbeat.interval.ms = 3000\ninterceptor.classes = null\nkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializer\nmax.partition.fetch.bytes = 1048576\nmax.poll.interval.ms = 300000\nmax.poll.records = 500\nmetadata.max.age.ms = 300000\nmetric.reporters = []\nmetrics.num.samples = 2\nmetrics.sample.window.ms = 30000\npartition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]\nreceive.buffer.bytes = 65536\nreconnect.backoff.ms = 50\nrequest.timeout.ms = 305000\nretry.backoff.ms = 100\nsasl.kerberos.kinit.cmd = /usr/bin/kinit\nsasl.kerberos.min.time.before.relogin = 60000\nsasl.kerberos.service.name = null\nsasl.kerberos.ticket.renew.jitter = 0.05\nsasl.kerberos.ticket.renew.window.factor = 0.8\nsasl.mechanism = GSSAPI\nsecurity.protocol = PLAINTEXT\nsend.buffer.bytes = 131072\nsession.timeout.ms = 10000\nssl.cipher.suites = null\nssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\nssl.endpoint.identification.algorithm = null\nssl.key.password = null\nssl.keymanager.algorithm = SunX509\nssl.keystore.location = null\nssl.keystore.password = null\nssl.keystore.type = JKS\nssl.protocol = TLS\nssl.provider = null\nssl.secure.random.implementation = null\nssl.trustmanager.algorithm = PKIX\nssl.truststore.location = null\nssl.truststore.password = null\nssl.truststore.type = JKS\nvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer\n
Run Code Online (Sandbox Code Playgroud)\n\n

ProducerConfig日志:

\n\n
2018-01-12 15:26:27.956  INFO 20924 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: \nacks = 1\nbatch.size = 16384\nblock.on.buffer.full = false\nbootstrap.servers = [0.0.0.0:9092]\nbuffer.memory = 33554432\nclient.id = producer-1\ncompression.type = none\nconnections.max.idle.ms = 540000\ninterceptor.classes = null\nkey.serializer = class org.apache.kafka.common.serialization.StringSerializer\nlinger.ms = 0\nmax.block.ms = 60000\nmax.in.flight.requests.per.connection = 5\nmax.request.size = 1048576\nmetadata.fetch.timeout.ms = 60000\nmetadata.max.age.ms = 300000\nmetric.reporters = []\nmetrics.num.samples = 2\nmetrics.sample.window.ms = 30000\npartitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner\nreceive.buffer.bytes = 32768\nreconnect.backoff.ms = 50\nrequest.timeout.ms = 30000\nretries = 0\nretry.backoff.ms = 100\nsasl.kerberos.kinit.cmd = /usr/bin/kinit\nsasl.kerberos.min.time.before.relogin = 60000\nsasl.kerberos.service.name = null\nsasl.kerberos.ticket.renew.jitter = 0.05\nsasl.kerberos.ticket.renew.window.factor = 0.8\nsasl.mechanism = GSSAPI\nsecurity.protocol = PLAINTEXT\nsend.buffer.bytes = 131072\nssl.cipher.suites = null\nssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\nssl.endpoint.identification.algorithm = null\nssl.key.password = null\nssl.keymanager.algorithm = SunX509\nssl.keystore.location = null\nssl.keystore.password = null\nssl.keystore.type = JKS\nssl.protocol = TLS\nssl.provider = null\nssl.secure.random.implementation = null\nssl.trustmanager.algorithm = PKIX\nssl.truststore.location = null\nssl.truststore.password = null\nssl.truststore.type = JKS\ntimeout.ms = 30000\nvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer\n
Run Code Online (Sandbox Code Playgroud)\n\n

当我尝试发送消息时出现异常:

\n\n

producer.send("kafka_topic", "test")

\n\n

异常日志:

\n\n
2018-01-12 15:26:27.975  INFO 20924 --- [nio-8080-exec-1]    o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1\n2018-01-12 15:26:27.975  INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247\n2018-01-12 15:26:58.152 ERROR 20924 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key=\'null\' and payload=\'test\' to topic kafka_topic:\n\norg.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka_topic-0 due to 30033 ms has passed since batch creation plus linger time\n
Run Code Online (Sandbox Code Playgroud)\n\n

如何修复它?

\n

May*_*yur 5

问题不在于发送keynull,可能无法建立与代理的连接