Pyspark Column.isin()用于大型集合

Leo*_*nid 6 ipython python-3.x apache-spark pyspark

码:

views = sdf \
    .where(sdf['PRODUCT_ID'].isin(PRODUCTS)) \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()

for sess_id, rows in views:
    # do something
Run Code Online (Sandbox Code Playgroud)

PRODUCTS是一个set.它很大,约10000件.

代码失败了:

--> 9 for sess_id, rows in views:

/usr/local/spark/python/pyspark/rdd.py in _load_from_socket(port, serializer)
--> 142         for item in serializer.load_stream(rf):

/usr/local/spark/python/pyspark/serializers.py in load_stream(self, stream)
--> 139                 yield self._read_with_length(stream)

/usr/local/spark/python/pyspark/serializers.py in _read_with_length(self, stream)
--> 156         length = read_int(stream)

/usr/local/spark/python/pyspark/serializers.py in read_int(stream)
--> 543     length = stream.read(4)

/opt/conda/lib/python3.5/socket.py in readinto(self, b)
    574             try:
--> 575                 return self._sock.recv_into(b)
    576             except timeout:
    577                 self._timeout_occurred = True

timeout: timed out
Run Code Online (Sandbox Code Playgroud)

但是当我做PRODUCTS小一点时,一切都没问题.我试图在Spark配置中更改一些超时值.它没有帮助.如何避免这种崩溃?

UPDATE

PRODUCTS = sdf.sort(['TIMESTAMP']).select('PRODUCT_ID').limit(10000).drop_duplicates()

views = sdf \
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()

for sess_id, rows in views:
    # do ...
Run Code Online (Sandbox Code Playgroud)

现在PRODUCTS是一个数据帧.而我用join.得到了同样的错误..

更新2

尝试这个解决方案:

views = sdf \
    .join(PRODUCTS, 'PRODUCT_ID', 'inner') \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID'])
views.cache()

for sess_id, rows in views.toLocalIterator():
    pass
Run Code Online (Sandbox Code Playgroud)

一段时间后出现了很长的错误:

Py4JJavaError: An error occurred while calling o289.javaToPython.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
....
Run Code Online (Sandbox Code Playgroud)

此错误只出现一次!现在我得到了相同的超时异常!

小智 0

正如@eliasah 在他的评论中所说。您应该尝试连接两个 DataFrame 以排除 Products 表中没有的内容。

views = sdf \
    .join(PRODUCTS) \
    .where(sdf['PRODUCT_ID']) \
    .rdd \
    .groupBy(lambda x: x['SESSION_ID']) \
    .toLocalIterator()
Run Code Online (Sandbox Code Playgroud)