Spark 2.1 + Kafka 0.10 + Spark流媒体.
批量持续时间为30秒.
我有13个节点,2个代理,每个主题/分区每个执行器使用1个核心.
LocationStrategy是PreferConsistent.
当消耗1个主题时,没有问题执行器总是处理相同的主题/分区(测试直到24个分区).
当我添加另一个主题时,用于处理主题/分区的一些执行程序会从一个批处理更改为另一个批处理.
当执行程序再次处理相同的主题/分区时(例如,之后的3个批处理,因此在上一个处理之后的1:30),由于来自代理的请求超时(request.timeout.ms参数),我得到了我的KafkaConsumer的断开连接然后我在40s期间阻止了对Kafka的新fetch查询(再次请求request.timeout.ms参数).
2017-10-09 16:51:30.336 DEBUG [Executor task launch worker for task 315]:org.apache.spark.internal.Logging$class - Seeking to topic2-7 136136613
2017-10-09 16:51:30.336 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to offset 136136613 for partition topic2-7
2017-10-09 16:51:30.337 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient - Disconnecting from node 1005 due to request timeout.
2017-10-09 16:51:30.337 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler - Cancelled FETCH request ClientRequest(expectResponse=true, …Run Code Online (Sandbox Code Playgroud) 我已经创建了一个简单的java应用程序来调试我的问题,但我仍然被阻止。我使用 Maven 程序集创建了一个 jar,但在启动时出现此错误:
D:\Workspace_java11\testlog4j\target>java -Dlog4j.configurationFile=log4j2.xml -jar testlog4j-0.0.1-SNAPSHOT-jar-with-dependencies.jar
WARNING: sun.reflect.Reflection.getCallerClass is not supported. This will impact performance.
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger] …Run Code Online (Sandbox Code Playgroud)