pys*_*ark 5 elasticsearch elastic-map-reduce apache-spark pyspark
我已将ELK与Pyspark集成.
将RDD保存为本地文件系统上的ELK数据
rdd.saveAsTextFile("/tmp/ELKdata")
logData = sc.textFile('/tmp/ELKdata/*')
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors.count()
Run Code Online (Sandbox Code Playgroud)
我得到的价值是35
errors.first()
Run Code Online (Sandbox Code Playgroud)
我得到了输出
(u'AVI0UK0KZsowGuTwoQnN',{u'host':u'raw1-VirtualBox',u'ident':u'NetworkManager',u'pid':u'748',u'message':u"(eth0):设备状态更改:ip-config - > secondaries(reason'none')[70 90 0]",u'@ timestamp':u'2016-01-12T10:59:48 + 05:30'})
当我尝试从pyspark的弹性搜索中写入数据时,我会收到错误
errors.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf= {"es.resource" : "logstash-2016.01.12/errors})
Run Code Online (Sandbox Code Playgroud)
巨大的java错误
org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665
Traceback (most recent call last):
File "", line 6, in
File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile
keyConverter, valueConverter, jconf)
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62)
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62)
org.apache.spark.TaskKilledException
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException:
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113)
org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
如果我手动完成它能够写入数据
errors = logData.filter(lambda line: "raw1-VirtualBox" in line)
errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox",
"ident": "NetworkManager",
"pid": "69",
"message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]",
"@timestamp": "2016-01-12T10:59:48+05:30"
}))
Run Code Online (Sandbox Code Playgroud)
但我想用弹性搜索来编写过滤后的数据和托管数据.
我遇到了类似的问题,这就是我设法解决它的方法.首先,我使用数据帧与使用RDD.
一旦进入数据帧
from pyspark.sql import SQLContext
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4076 次 |
| 最近记录: |