配置sink elasticsearch apache-flume

Lif*_*orm 7 flume elasticsearch data-ingestion

这是我第一次来这里,很抱歉,如果我不发布罚款,抱歉我的英语不好.

我正在尝试配置Apache Flume和Elasticsearch接收器.一切都很好,似乎它运行正常,但是当我启动代理时有2个警告; 以下是:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@ce359aa counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies
Run Code Online (Sandbox Code Playgroud)

我的代理配置:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink ES
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = items
a1.sinks.k1.indexType = item
a1.sinks.k1.clusterName = elasticsearch
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Run Code Online (Sandbox Code Playgroud)

它启动netcat,一切都很好,但我担心这些警告,我不明白.

frb*_*frb 1

查看日志,发现存在缺少某些依赖项的问题。

如果您查看文档ElasticSearchSink,您将看到以下内容:

您的环境所需的elasticsearch和lucene-core jar必须放置在Apache Flume安装的lib目录中。Elasticsearch 要求客户端 JAR 的主要版本与服务器的 JAR 版本匹配,并且两者都运行相同的 JVM 次要版本。如果不正确,将会出现 SerializationExceptions。要选择所需的版本,首先确定elasticsearch的版本和目标集群运行的JVM版本。然后选择与主要版本匹配的elasticsearch客户端库。0.19.x 客户端可以与 0.19.x 集群通信;0.20.x 可以与 0.20.x 通信,0.90.x 可以与 0.90.x 通信。确定 elasticsearch 版本后,读取 pom.xml 文件以确定要使用的正确的 lucene-core JAR 版本。运行 ElasticSearchSink 的 Flume 代理还应该与目标集群运行的 JVM 匹配,直至次要版本。

很可能您没有放置所需的 Java jar,或者版本不合适。