使用Kafka将数据导入Hadoop

Mar*_*rko 6 hadoop flume apache-kafka

首先,我正在考虑使用什么来将事件放入Hadoop,在那里存储它们并定期对它们进行分析(可能使用Ooozie来安排定期分析)Kafka或Flume,并决定Kafka可能是一个更好的解决方案,因为我们还有一个用于执行事件处理的组件,因此以这种方式,批处理和事件处理组件以相同的方式获取数据.

但是知道我正在寻找具体的建议如何从经纪人那里获取数据到Hadoop.

我在这里发现Flume可以与Kafka结合使用

  • Flume - 包含Kafka Source(消费者)和Sink(制作人)

并且在同一页面和Kafka文档中也发现了一些名为Camus的东西

  • Camus - LinkedIn的Kafka => HDFS管道.这个用于LinkedIn的所有数据,效果很好.

我对能做到这一点的更好(更简单,记录更好的解决方案)感兴趣吗?此外,有任何示例或教程如何做到这一点?

我应该何时使用这种变体而不是更简单的高级消费者

如果有另外一个/更好的解决方案而不是这两个,我会打开建议.

谢谢

sun*_*tha 2

您可以使用flume将数据从Kafka转储到HDFS。Flume 有 kafka 源和汇。这是属性文件更改的问题。下面给出一个例子。

脚步:

  1. 创建一个kafka主题

    kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --     partitions 1 --topic testkafka
    
    Run Code Online (Sandbox Code Playgroud)
  2. 使用kafka控制台生产者写入上面创建的主题

    kafka-console-producer --broker-list localhost:9092 --topic testkafka
    
    Run Code Online (Sandbox Code Playgroud)
  3. 使用以下属性配置 Flume 代理

    flume1.sources  = kafka-source-1
    flume1.channels = hdfs-channel-1
    flume1.sinks    = hdfs-sink-1
    flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
    flume1.sources.kafka-source-1.zookeeperConnect = localhost:2181
    flume1.sources.kafka-source-1.topic =testkafka
    flume1.sources.kafka-source-1.batchSize = 100
    flume1.sources.kafka-source-1.channels = hdfs-channel-1
    
    flume1.channels.hdfs-channel-1.type   = memory
    flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
    flume1.sinks.hdfs-sink-1.type = hdfs
    flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
    flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
    flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
    flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
    flume1.sinks.hdfs-sink-1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
    flume1.sinks.hdfs-sink-1.hdfs.rollCount=100
    flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
    flume1.channels.hdfs-channel-1.capacity = 10000
    flume1.channels.hdfs-channel-1.transactionCapacity = 1000
    
    Run Code Online (Sandbox Code Playgroud)

将上面的配置文件保存为 example.conf

  1. 运行 Flume 代理

    flume-ng agent -n flume1 -c conf -f example.conf -    Dflume.root.logger=INFO,console
    
    Run Code Online (Sandbox Code Playgroud)

数据现在将转储到以下路径下的 HDFS 位置

/tmp/kafka/%{topic}/%y-%m-%d
Run Code Online (Sandbox Code Playgroud)