tsa*_*txt 7 apache-kafka docker spring-boot
我正在尝试使用Spring Boot
withKafka
和ZooKeeper
withDocker
:
docker-compose.yml:
version: \'2\'\n\nservices:\n\nzookeeper:\n image: wurstmeister/zookeeper\n restart: always\n ports:\n - "2181:2181"\n\nkafka:\n image: wurstmeister/kafka\n restart: always\n ports:\n - "9092:9092"\n environment:\n KAFKA_ADVERTISED_HOST_NAME: localhost\n KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181\n
Run Code Online (Sandbox Code Playgroud)\n\ndocker ps
输出:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES\n980e6b09f4e3 wurstmeister/kafka "start-kafka.sh" 29 minutes ago Up 29 minutes 0.0.0.0:9092->9092/tcp samplespringkafkaproducerconsumermaster_kafka_1\n64519d4808aa wurstmeister/zookeeper "/bin/sh -c \'/usr/sb\xe2\x80\xa6" 2 hours ago Up 29 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp samplespringkafkaproducerconsumermaster_zookeeper_1\n
Run Code Online (Sandbox Code Playgroud)\n\ndocker-compose up
输出日志:
kafka_1 | [2018-01-12 13:14:49,545] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,546] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,546] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,547] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,547] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,548] INFO Client environment:os.version=4.9.60-linuxkit-aufs (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,548] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,549] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,549] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,552] INFO Initiating client connection, connectString=zookeeper:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@1534f01b (org.apache.zookeeper.ZooKeeper)\nkafka_1 | [2018-01-12 13:14:49,574] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)\nkafka_1 | [2018-01-12 13:14:49,578] INFO Opening socket connection to server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)\nzookeeper_1 | 2018-01-12 13:14:49,591 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@192] - Accepted socket connection from /192.168.32.3:51466\nkafka_1 | [2018-01-12 13:14:49,593] INFO Socket connection established to samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, initiating session (org.apache.zookeeper.ClientCnxn)\nzookeeper_1 | 2018-01-12 13:14:49,600 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@928] - Client attempting to establish new session at /192.168.32.3:51466\nzookeeper_1 | 2018-01-12 13:14:49,603 [myid:] - INFO [SyncThread:0:FileTxnLog@203] - Creating new log file: log.fd\nzookeeper_1 | 2018-01-12 13:14:49,613 [myid:] - INFO [SyncThread:0:ZooKeeperServer@673] - Established session 0x160ea8232b00000 with negotiated timeout 6000 for client /192.168.32.3:51466\nkafka_1 | [2018-01-12 13:14:49,616] INFO Session establishment complete on server samplespringkafkaproducerconsumermaster_zookeeper_1.samplespringkafkaproducerconsumermaster_default/192.168.32.2:2181, sessionid = 0x160ea8232b00000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)\nkafka_1 | [2018-01-12 13:14:49,619] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)\nkafka_1 | [2018-01-12 13:14:49,992] INFO Cluster ID = Fgy9ybPPQQ-QdLINzHpmVA (kafka.server.KafkaServer)\nkafka_1 | [2018-01-12 13:14:50,003] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)\nkafka_1 | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)\nkafka_1 | [2018-01-12 13:14:50,065] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)\nkafka_1 | [2018-01-12 13:14:50,067] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)\nkafka_1 | [2018-01-12 13:14:50,167] INFO Log directory \'/kafka/kafka-logs-980e6b09f4e3\' not found, creating it. (kafka.log.LogManager)\nkafka_1 | [2018-01-12 13:14:50,183] INFO Loading logs. (kafka.log.LogManager)\nkafka_1 | [2018-01-12 13:14:50,199] INFO Logs loading complete in 15 ms. (kafka.log.LogManager)\nkafka_1 | [2018-01-12 13:14:50,283] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)\nkafka_1 | [2018-01-12 13:14:50,291] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)\nkafka_1 | [2018-01-12 13:14:50,633] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)\nkafka_1 | [2018-01-12 13:14:50,639] INFO [SocketServer brokerId=1005] Started 1 acceptor threads (kafka.network.SocketServer)\nkafka_1 | [2018-01-12 13:14:50,673] INFO [ExpirationReaper-1005-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1 | [2018-01-12 13:14:50,674] INFO [ExpirationReaper-1005-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1 | [2018-01-12 13:14:50,675] INFO [ExpirationReaper-1005-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1 | [2018-01-12 13:14:50,691] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)\nkafka_1 | [2018-01-12 13:14:50,753] INFO [ExpirationReaper-1005-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1 | [2018-01-12 13:14:50,757] INFO [ExpirationReaper-1005-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1 | [2018-01-12 13:14:50,762] INFO [ExpirationReaper-1005-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)\nkafka_1 | [2018-01-12 13:14:50,777] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)\nkafka_1 | [2018-01-12 13:14:50,791] INFO [GroupCoordinator 1005]: Starting up. (kafka.coordinator.group.GroupCoordinator)\nkafka_1 | [2018-01-12 13:14:50,791] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)\nkafka_1 | [2018-01-12 13:14:50,793] INFO [GroupCoordinator 1005]: Startup complete. (kafka.coordinator.group.GroupCoordinator)\nkafka_1 | [2018-01-12 13:14:50,798] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 5 milliseconds. (kafka.coordinator.group.GroupMetadataManager)\nkafka_1 | [2018-01-12 13:14:50,811] INFO [ProducerId Manager 1005]: Acquired new producerId block (brokerId:1005,blockStartProducerId:5000,blockEndProducerId:5999) by writing to Zk with path version 6 (kafka.coordinator.transaction.ProducerIdManager)\nkafka_1 | [2018-01-12 13:14:50,848] INFO [TransactionCoordinator id=1005] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)\nkafka_1 | [2018-01-12 13:14:50,850] INFO [Transaction Marker Channel Manager 1005]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)\nkafka_1 | [2018-01-12 13:14:50,850] INFO [TransactionCoordinator id=1005] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)\nkafka_1 | [2018-01-12 13:14:50,949] INFO Creating /brokers/ids/1005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)\nzookeeper_1 | 2018-01-12 13:14:50,952 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x70 zxid:0x102 txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers\nzookeeper_1 | 2018-01-12 13:14:50,952 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:create cxid:0x71 zxid:0x103 txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids\nkafka_1 | [2018-01-12 13:14:50,957] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)\nkafka_1 | [2018-01-12 13:14:50,959] INFO Registered broker 1005 at path /brokers/ids/1005 with addresses: EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)\nkafka_1 | [2018-01-12 13:14:50,961] WARN No meta.properties file under dir /kafka/kafka-logs-980e6b09f4e3/meta.properties (kafka.server.BrokerMetadataCheckpoint)\nkafka_1 | [2018-01-12 13:14:50,992] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)\nkafka_1 | [2018-01-12 13:14:50,993] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)\nkafka_1 | [2018-01-12 13:14:51,004] INFO [KafkaServer id=1005] started (kafka.server.KafkaServer)\nzookeeper_1 | 2018-01-12 13:14:51,263 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x160ea8232b00000 type:delete cxid:0xe3 zxid:0x105 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election\nkafka_1 | [2018-01-12 13:24:50,793] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)\nkafka_1 | [2018-01-12 13:34:50,795] INFO [GroupMetadataManager brokerId=1005] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)\n
Run Code Online (Sandbox Code Playgroud)\n\nKafka
Maven 依赖项Producer
和Consumer
:
<parent>\n <groupId>org.springframework.boot</groupId>\n <artifactId>spring-boot-starter-parent</artifactId>\n <version>1.5.9.RELEASE</version>\n <relativePath/>\n </parent>\n\n <dependency>\n <groupId>org.springframework.kafka</groupId>\n <artifactId>spring-kafka</artifactId>\n </dependency>\n
Run Code Online (Sandbox Code Playgroud)\n\napplication.properties
在Producer
:
spring.kafka.producer.bootstrap-servers=0.0.0.0:9092\n\n spring.kafka.consumer.topic=kafka_topic\n server.port=8080\n
Run Code Online (Sandbox Code Playgroud)\n\napplication.properties
在Consumer
:
spring.kafka.consumer.bootstrap-servers=0.0.0.0:9092\n spring.kafka.consumer.group-id=WorkUnitApp\n\n spring.kafka.consumer.topic=kafka_topic\n server.port=8081\n
Run Code Online (Sandbox Code Playgroud)\n\nConsumer
:
@Component \npublic class Consumer {\n\nprivate static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);\n\n@KafkaListener(topics = "${spring.kafka.consumer.topic}")\n public void receive(ConsumerRecord<?, ?> consumerRecord) {\n LOGGER.info("received payload=\'{}\'", consumerRecord.toString());\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\nProducer
:
@Component\npublic class Producer {\n\nprivate static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);\n\n@Autowired\nprivate KafkaTemplate<String, String> kafkaTemplate;\n\npublic void send(String topic, String payload) {\n LOGGER.info("sending payload=\'{}\' to topic=\'{}\'", payload, topic);\n kafkaTemplate.send(topic, payload);\n }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\nConsumerConfig
日志:
2018-01-12 15:25:48.220 INFO 20919 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: \nauto.commit.interval.ms = 5000\nauto.offset.reset = latest\nbootstrap.servers = [0.0.0.0:9092]\ncheck.crcs = true\nclient.id = consumer-1\nconnections.max.idle.ms = 540000\nenable.auto.commit = true\nexclude.internal.topics = true\nfetch.max.bytes = 52428800\nfetch.max.wait.ms = 500\nfetch.min.bytes = 1\ngroup.id = WorkUnitApp\nheartbeat.interval.ms = 3000\ninterceptor.classes = null\nkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializer\nmax.partition.fetch.bytes = 1048576\nmax.poll.interval.ms = 300000\nmax.poll.records = 500\nmetadata.max.age.ms = 300000\nmetric.reporters = []\nmetrics.num.samples = 2\nmetrics.sample.window.ms = 30000\npartition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]\nreceive.buffer.bytes = 65536\nreconnect.backoff.ms = 50\nrequest.timeout.ms = 305000\nretry.backoff.ms = 100\nsasl.kerberos.kinit.cmd = /usr/bin/kinit\nsasl.kerberos.min.time.before.relogin = 60000\nsasl.kerberos.service.name = null\nsasl.kerberos.ticket.renew.jitter = 0.05\nsasl.kerberos.ticket.renew.window.factor = 0.8\nsasl.mechanism = GSSAPI\nsecurity.protocol = PLAINTEXT\nsend.buffer.bytes = 131072\nsession.timeout.ms = 10000\nssl.cipher.suites = null\nssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\nssl.endpoint.identification.algorithm = null\nssl.key.password = null\nssl.keymanager.algorithm = SunX509\nssl.keystore.location = null\nssl.keystore.password = null\nssl.keystore.type = JKS\nssl.protocol = TLS\nssl.provider = null\nssl.secure.random.implementation = null\nssl.trustmanager.algorithm = PKIX\nssl.truststore.location = null\nssl.truststore.password = null\nssl.truststore.type = JKS\nvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer\n
Run Code Online (Sandbox Code Playgroud)\n\nProducerConfig
日志:
2018-01-12 15:26:27.956 INFO 20924 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values: \nacks = 1\nbatch.size = 16384\nblock.on.buffer.full = false\nbootstrap.servers = [0.0.0.0:9092]\nbuffer.memory = 33554432\nclient.id = producer-1\ncompression.type = none\nconnections.max.idle.ms = 540000\ninterceptor.classes = null\nkey.serializer = class org.apache.kafka.common.serialization.StringSerializer\nlinger.ms = 0\nmax.block.ms = 60000\nmax.in.flight.requests.per.connection = 5\nmax.request.size = 1048576\nmetadata.fetch.timeout.ms = 60000\nmetadata.max.age.ms = 300000\nmetric.reporters = []\nmetrics.num.samples = 2\nmetrics.sample.window.ms = 30000\npartitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner\nreceive.buffer.bytes = 32768\nreconnect.backoff.ms = 50\nrequest.timeout.ms = 30000\nretries = 0\nretry.backoff.ms = 100\nsasl.kerberos.kinit.cmd = /usr/bin/kinit\nsasl.kerberos.min.time.before.relogin = 60000\nsasl.kerberos.service.name = null\nsasl.kerberos.ticket.renew.jitter = 0.05\nsasl.kerberos.ticket.renew.window.factor = 0.8\nsasl.mechanism = GSSAPI\nsecurity.protocol = PLAINTEXT\nsend.buffer.bytes = 131072\nssl.cipher.suites = null\nssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\nssl.endpoint.identification.algorithm = null\nssl.key.password = null\nssl.keymanager.algorithm = SunX509\nssl.keystore.location = null\nssl.keystore.password = null\nssl.keystore.type = JKS\nssl.protocol = TLS\nssl.provider = null\nssl.secure.random.implementation = null\nssl.trustmanager.algorithm = PKIX\nssl.truststore.location = null\nssl.truststore.password = null\nssl.truststore.type = JKS\ntimeout.ms = 30000\nvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer\n
Run Code Online (Sandbox Code Playgroud)\n\n当我尝试发送消息时出现异常:
\n\nproducer.send("kafka_topic", "test")
异常日志:
\n\n2018-01-12 15:26:27.975 INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.10.1.1\n2018-01-12 15:26:27.975 INFO 20924 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : f10ef2720b03b247\n2018-01-12 15:26:58.152 ERROR 20924 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key=\'null\' and payload=\'test\' to topic kafka_topic:\n\norg.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for kafka_topic-0 due to 30033 ms has passed since batch creation plus linger time\n
Run Code Online (Sandbox Code Playgroud)\n\n如何修复它?
\n问题不在于发送key
为null
,可能无法建立与代理的连接
尝试使用本地 Kafka 安装。
如果您使用 mac Docker for mac 有一些网络限制 https://docs.docker.com/docker-for-mac/networking/#known-limitations-use-cases-and-workarounds
归档时间: |
|
查看次数: |
27483 次 |
最近记录: |