Ars*_*eny 5 python apache-spark pyspark-sql databricks azure-databricks
我正在尝试使用DataFrame.hint()方法向我的联接添加范围联接提示。
我有两个表:minutes和events。
分钟表的minute_start和minute_end列是自固定时间以来以秒为单位的时间。当然,它们的值是 60 的倍数。
事件表有相似的event_start和event_end列,仅用于事件。事件可以在任何一秒开始和结束。
对于每个事件,我需要找到它重叠的所有分钟。
我正在 Databricks(运行时 5.1,Python 3.5)上尝试这个:
# from pyspark.sql.types import StructType, StructField, IntegerType
# minutes = spark.sparkContext\
#                .parallelize(((0,  60),
#                              (60, 120)))\
#                .toDF(StructType([
#                          StructField('minute_start', IntegerType()),
#                          StructField('minute_end', IntegerType())
#                        ]))
# events = spark.sparkContext\
#               .parallelize(((12, 33),
#                             (0,  120),
#                             (33, 72),
#                             (65, 178)))\
#               .toDF(StructType([
#                         StructField('event_start', IntegerType()),
#                         StructField('event_end', IntegerType())
#                       ]))
events.hint("range_join", "60")\
      .join(minutes,
            on=[events.event_start   < minutes.minute_end,
                minutes.minute_start < events.event_end])\
      .orderBy(events.event_start,
               events.event_end,
               minutes.minute_start)\
      .show()
没有hint调用,结果如预期:
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
|          0|      120|           0|        60|
|          0|      120|          60|       120|
|         12|       33|           0|        60|
|         33|       72|           0|        60|
|         33|       72|          60|       120|
|         65|      178|          60|       120|
+-----------+---------+------------+----------+
有了hint,我得到了例外:
AnalysisException: 'Range join hint: invalid arguments Buffer(60);'
当我尝试将60提示中的传递为数字而不是字符串时,它抱怨提示的参数必须是字符串。
我不在 Azure 上,但我希望结果是一样的。
有没有人遇到过类似的问题并找到了解决方案或知道我在哪里犯了错误?
更新 1
(目前,我正在 Databricks Runtime 6.1、Python 3.7.3、Spark 2.4.4 上尝试它)
我以为我错过了参数是可迭代的,所以我再次尝试,使用events.hint("range_join", [60]). 关于参数不是字符串的同样抱怨:TypeError: all parameters should be str, got 60 of type <class 'int'>。
我想知道 Databricks 的 Spark 版本是否落后。
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
|          0|      120|           0|        60|
|          0|      120|          60|       120|
|         12|       33|           0|        60|
|         33|       72|           0|        60|
|         33|       72|          60|       120|
|         65|      178|          60|       120|
+-----------+---------+------------+----------+
所以int应该允许 s的列表。
我得到的是all parameters should be str,但是all parameters should be in (basestring, list, float, int)如果我传递了错误类型的参数,GitHub 版本会说。
更新 2
hint("skew", "col_name") 似乎正在工作。
我在 GitHub 上查看了 Spark 源代码。
2.4.4版本有这个:
def hint(self, name, *parameters):
    ...  # no checks on `parameters` up to here
    for p in parameters:
        if not isinstance(p, str):
            raise TypeError(
                "all parameters should be str, got {0} of type {1}".format(p, type(p)))
    ...  # no checks beyond here
但从版本 3.0.0-preview-rc1开始,源码中有这样的:
def hint(self, name, *parameters):
    ...  # no checks on `parameters` up to here
    allowed_types = (basestring, list, float, int)
    for p in parameters:
        if not isinstance(p, allowed_types):
            raise TypeError(
                "all parameters should be in {0}, got {1} of type {2}".format(
                    allowed_types, p, type(p)))
    ...  # no checks beyond here
所以看起来 2.4.4 版本有一个错误,从 3.0.0-preview-rc1 开始的版本已修复该错误。
| 归档时间: | 
 | 
| 查看次数: | 1490 次 | 
| 最近记录: |