小编sty*_*e95的帖子

如何使用Kafka 0.8 Log4j appender

我正在尝试运行Kafka-0.8 Log4j appender而我无法做到.我希望我的应用程序通过Log4j appender将日志直接发送到kafka.

这是我的log4j.properties.我找不到任何合适的编码器,所以我只是将其配置为使用默认编码器.(例如,我对这条线进行了评论.)

log4j.rootLogger=INFO, stdout, KAFKA

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n
log4j.appender.KAFKA.BrokerList=hnode01:9092
log4j.appender.KAFKA.Topic=DKTestEvent

#log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder
Run Code Online (Sandbox Code Playgroud)

这是我的示例应用程序.

import org.apache.log4j.Logger;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.PropertyConfigurator;

public class HelloWorld {

        static Logger logger = Logger.getLogger(HelloWorld.class.getName());

        public static void main(String[] args) {
            PropertyConfigurator.configure(args[0]);

            logger.info("Entering application.");
            logger.debug("Debugging!.");
            logger.info("Exiting application.");
        }
}
Run Code Online (Sandbox Code Playgroud)

我用maven编译.我在我的pom.xml中包含了kafka_2.8.2-0.8.0和log4j_1.2.17

我收到这些错误:

INFO [main] (Logging.scala:67) - Verifying properties
INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092
INFO [main] (Logging.scala:67) - Property serializer.class is overridden …
Run Code Online (Sandbox Code Playgroud)

log4j appender apache-kafka

9
推荐指数
2
解决办法
2万
查看次数

如何使Spark Streaming(Spark 1.0.0)读取Kafka的最新数据(Kafka Broker 0.8.1)

我的火花流应用程序从Kafka获取数据并对它们进行处理.

如果应用程序失败,大量数据存储在Kafka中,并且在下一次启动Spark Streaming应用程序时,它会崩溃,因为一次消耗的数据太多.由于我的应用程序不关心过去的数据,因此仅使用当前(最新)数据是完全正常的.

我找到了"auto.reset.offest"选项,它在Spark中的表现略有不同.它会删除存储在zookeeper中的偏移量(如果已配置).尽管如此,它的意外行为,应该是在删除后从最新的数据中获取数据.

但我发现它不是.在使用数据之前,我看到所有的偏移都被清理干净了.然后,由于默认行为,它应该按预期获取数据.但由于数据太多,它仍然会崩溃.

当我使用"Kafka-Console-Consumer"清理偏移并消耗最新数据并运行我的应用程序时,它按预期工作.

所以看起来"auto.reset.offset"不起作用,火花流中的kafka消费者从默认的"最小"偏移量中获取数据.

您是否知道如何从最新的火花流中消耗Kafka数据?

我使用的是spark-1.0.0和Kafka-2.10-0.8.1.

提前致谢.

offset apache-kafka apache-spark spark-streaming kafka-consumer-api

7
推荐指数
1
解决办法
5218
查看次数