Leo*_*nid 6 ipython python-3.x apache-spark pyspark
码:
views = sdf \
.where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do something
Run Code Online (Sandbox Code Playgroud)
PRODUCTS是一个set.它很大,约10000件.
代码失败了:
--> 9 for sess_id, rows in views:
/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142 for item in serializer.load_stream(rf):
/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139 yield self._read_with_length(stream)
/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156 length = read_int(stream)
/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543 length = stream.read(4)
/opt/conda/lib/python3.5/socket.py in readinto(self, b)
574 try:
--> 575 return self._sock.recv_into(b)
576 except timeout:
577 self._timeout_occurred = True
timeout: timed out
Run Code Online (Sandbox Code Playgroud)
但是当我做PRODUCTS小一点时,一切都没问题.我试图在Spark配置中更改一些超时值.它没有帮助.如何避免这种崩溃?
UPDATE
PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
for sess_id, rows in views:
# do ...
Run Code Online (Sandbox Code Playgroud)
现在PRODUCTS是一个数据帧.而我用join.得到了同样的错误..
更新2
尝试这个解决方案:
views = sdf \
.join(PRODUCTS, 'PRODUCT_ID', 'inner') \
.rdd \
.groupBy(lambda x: x['SESSION_ID'])
views.cache()
for sess_id, rows in views.toLocalIterator():
pass
Run Code Online (Sandbox Code Playgroud)
一段时间后出现了很长的错误:
Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....
Run Code Online (Sandbox Code Playgroud)
此错误只出现一次!现在我得到了相同的超时异常!
小智 0
正如@eliasah 在他的评论中所说。您应该尝试连接两个 DataFrame 以排除 Products 表中没有的内容。
views = sdf \
.join(PRODUCTS) \
.where(sdf['PRODUCT_ID']) \
.rdd \
.groupBy(lambda x: x['SESSION_ID']) \
.toLocalIterator()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2506 次 |
| 最近记录: |