Zho*_*ANG 6 python elasticsearch apache-spark pyspark
我正在使用 pyspark 流从 tweepy 收集数据。完成所有设置后,我通过 elasticsearch.index() 将 dict(json) 发送到 elasticsearch。但是我收到“can't pickle_thread.lock objects”错误和其他 63 个错误。回溯日志太长,无法在我的控制台中显示!
设计是我获得一个 json/dict 类型的文件,将其转换为 DStream,通过在 map() 函数中调用 TextBlob 向其添加另一个功能名称“情绪”。一切正常,但是当我添加另一个映射函数来调用 elasticsearch.index() 时,出现错误。
下面是我的控制台中超长错误日志的一部分。
Blockquote 在处理上述异常的过程中,发生了另一个异常:Traceback(最近一次调用最后一次):文件“/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/streaming/util.py”,第105行, 在转储 func.func, func.rdd_wrap_func, func.deserializers)))) 文件“/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/serializers.py”,第 460 行,在转储中返回 cloudpickle .dumps(obj, 2) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 704, in dumps cp.dump(obj) File "/Users/ayane /anaconda/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 162, in dump raise pickle.PicklingError(msg) _pickle.PicklingError: 无法序列化对象: TypeError: 不能pickle _thread.lock org.apache.spark 中的对象。stream.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144) 在 org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)在 org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) 在 org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1。 apply(PythonDStream.scala:100) 在 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 63 更多apply(PythonDStream.scala:100) 在 org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) 在 org.apache.spark.util.Utils$.tryOrIOException (Utils.scala:1303) ... 63 更多apply(PythonDStream.scala:100) 在 org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) 在 org.apache.spark.util.Utils$.tryOrIOException (Utils.scala:1303) ... 63 更多
我的部分代码如下所示:
def sendPut(doc):
res = es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)
return doc
myJson = dataStream.map(decodeJson).map(addSentiment).map(sendPut)
myJson.pprint()
Run Code Online (Sandbox Code Playgroud)
这是 decodeJson 函数:
def decodeJson(str):
return json.loads(str)
Run Code Online (Sandbox Code Playgroud)
这是 addSentiment 函数:
def addSentiment(dic):
dic['Sentiment'] = get_tweet_sentiment(dic['Text'])
return dic
Run Code Online (Sandbox Code Playgroud)
这是 get_tweet_sentiment 函数:
def get_tweet_sentiment(tweet):
analysis = TextBlob(tweet)
if analysis.sentiment.polarity > 0:
return 'positive'
elif analysis.sentiment.polarity == 0:
return 'neutral'
else:
return 'negative'
Run Code Online (Sandbox Code Playgroud)
一般来说,连接对象是不可序列化的,因此不能通过闭包传递。你必须使用foreachPartition模式:
def sendPut(docs):
es = ... # Initialize es object
for doc in docs
es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)
myJson = (dataStream
.map(decodeJson)
.map(addSentiment)
# Here you need an action.
# `map` is lazy, and `pprint` doesn't guarantee complete execution
.foreachPartition(sendPut))
Run Code Online (Sandbox Code Playgroud)
如果你想退货,请使用mapPartitions:
def sendPut(docs):
es = ... # Initialize es object
for doc in docs
yield es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)
myJson = (dataStream
.map(decodeJson)
.map(addSentiment)
.mapPartitions(sendPut))
Run Code Online (Sandbox Code Playgroud)
但您需要额外的操作来强制执行。