我有一个运行多个线程的 tomcat - spring4.2 应用程序。每个线程仅从一个队列中出队,但是分配给一个队列的线程不止一个。
事情开始很好,但是在几个小时/大约 50 万个出队操作之后,我发现线程出队的速度非常慢。
在 jvisualvm 中看到橙色的线程 ie park 线程转储如下:
"EMLT_2" - Thread t@64
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <2cf42d7> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.commons.pool2.impl.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:583)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:442)
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
at redis.clients.util.Pool.getResource(Pool.java:48)
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:86)
at com.mycomp.sam.processors.SimpleDequeuer.dequeue(SimpleDequeuer.java:25)
at com.mycomp.sam.processors.EMLT.run(EMLT.java:29)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"EMLT_1" - Thread t@63
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <2cf42d7> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.commons.pool2.impl.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:583)
at …Run Code Online (Sandbox Code Playgroud) concurrency locking java.util.concurrent jedis apache-commons-pool
对于一些较大的消息,我遇到以下错误:
kafka.common.MessageSizeTooLargeException: Message size is 1185198 bytes which exceeds the maximum configured message size of 1000012.
Run Code Online (Sandbox Code Playgroud)
因此,根据此线程增加了代理和消费者的消息大小:
fetch.message.max.bytes=10485760
replica.fetch.max.bytes=10485760
message.max.bytes=10485760
Run Code Online (Sandbox Code Playgroud)
添加到 config/server.properties
但随后消息传递但消费者错误消失:
[2015-08-26 21:08:08,722] ERROR Error processing message, stopping consumer: (kafka.tools.ConsoleConsumer$)
kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic xyz partition 0 at fetch offset 29. Increase the fetch size, or decrease the maximum message size the broker will allow.
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:90)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
at scala.collection.Iterator$class.foreach(Iterator.scala:660) …Run Code Online (Sandbox Code Playgroud)