如何使用Kafka 0.8 Log4j appender

sty*_*e95 9 log4j appender apache-kafka

我正在尝试运行Kafka-0.8 Log4j appender而我无法做到.我希望我的应用程序通过Log4j appender将日志直接发送到kafka.

这是我的log4j.properties.我找不到任何合适的编码器,所以我只是将其配置为使用默认编码器.(例如,我对这条线进行了评论.)

log4j.rootLogger=INFO, stdout, KAFKA

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n
log4j.appender.KAFKA.BrokerList=hnode01:9092
log4j.appender.KAFKA.Topic=DKTestEvent

#log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder
Run Code Online (Sandbox Code Playgroud)

这是我的示例应用程序.

import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.PropertyConfigurator;

public class HelloWorld {

        static Logger logger = Logger.getLogger(HelloWorld.class.getName());

        public static void main(String[] args) {
            PropertyConfigurator.configure(args[0]);

            logger.info("Entering application.");
            logger.debug("Debugging!.");
            logger.info("Exiting application.");
        }
}
Run Code Online (Sandbox Code Playgroud)

我用maven编译.我在我的pom.xml中包含了kafka_2.8.2-0.8.0和log4j_1.2.17

我收到这些错误:

INFO [main] (Logging.scala:67) - Verifying properties
INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092
INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder
INFO [main] (HelloWorld.java:14) - Entering application.
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent)
.
.
.
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent)
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent)
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent)
.
.
.
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent)
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
java.lang.StackOverflowError
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.error(Category.java:322)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
at kafka.utils.Utils$.swallow(Utils.scala:189)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.info(Category.java:666)
at kafka.utils.Logging$class.info(Logging.scala:67)
at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:187)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:46)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:76)
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
.
.
.
Run Code Online (Sandbox Code Playgroud)

如果我没有终止程序,我会不断加错.

如果我错过了什么,请告诉我.

小智 5

我认为Jonas已经确定了问题,就是Kafka生产者日志记录也被记录到Kafka appender导致无限循环和最终的堆栈溢出(没有双关语意)你可以配置所有Kafka日志去另一个appender.以下显示将输出发送到stdout:

log4j.logger.kafka=INFO, stdout
Run Code Online (Sandbox Code Playgroud)

因此,您应该在log4j.properties中得到以下结果

log4j.rootLogger=INFO, stdout, KAFKA
log4j.logger.kafka=INFO, stdout
log4j.logger.HelloWorld=INFO, KAFKA
Run Code Online (Sandbox Code Playgroud)


Jon*_*röm 1

尝试设置appender异步,如下所示:log4j.appender.KAFKA.ProducerType=async

它进入无限循环似乎是合理的,因为 kafka 生产者本身已经登录了。