如何从Pyspark在Elasticsearch中写入数据?

pys*_*ark 5 elasticsearch elastic-map-reduce apache-spark pyspark

我已将ELK与Pyspark集成.

将RDD保存为本地文件系统上的ELK数据

rdd.saveAsTextFile("/tmp/ELKdata")
logData = sc.textFile('/tmp/ELKdata/*')
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors.count()
Run Code Online (Sandbox Code Playgroud)

我得到的价值是35

errors.first()
Run Code Online (Sandbox Code Playgroud)

我得到了输出

(u'AVI0UK0KZsowGuTwoQnN',{u'host':u'raw1-VirtualBox',u'ident':u'NetworkManager',u'pid':u'748',u'message':u"(eth0):设备状态更改:ip-config - > secondaries(reason'none')[70 90 0]",u'@ timestamp':u'2016-01-12T10:59:48 + 05:30'})

当我尝试从pyspark的弹性搜索中写入数据时,我会收到错误

errors.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf= {"es.resource" : "logstash-2016.01.12/errors}) 
Run Code Online (Sandbox Code Playgroud)

巨大的java错误


org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665
Traceback (most recent call last):
  File "", line 6, in 
  File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile
    keyConverter, valueConverter, jconf)
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62)
org.apache.spark.TaskKilledException
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: 
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
        org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
        org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


如果我手动完成它能够写入数据

errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
 errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox",
    "ident": "NetworkManager",
    "pid": "69",
    "message": " sucess  <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]",
    "@timestamp": "2016-01-12T10:59:48+05:30"
  }))
Run Code Online (Sandbox Code Playgroud)

但我想用弹性搜索来编写过滤后的数据和托管数据.

use*_*149 5

我遇到了类似的问题,这就是我设法解决它的方法.首先,我使用数据帧与使用RDD.

一旦进入数据帧

from pyspark.sql import SQLContext
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()
Run Code Online (Sandbox Code Playgroud)

  • 你在哪里给出ip地址或ES节点的dns? (2认同)