PySpark:如何在不达到速率限制的情况下调用 API/Web 服务?

Ril*_*Hun 5 python google-maps google-api apache-spark pyspark

我有4列的星火据帧:location_stringlocalityregion,和country。我使用谷歌地图的地理编码API来解析每一个location_string,然后在空使用结果来填充localityregioncountry领域。

我已将调用地理编码库的函数设为 udf,但我面临的问题是,当我超过 Google API 策略的速率限制时,最终会得到“OVERLIMIT”响应状态。

以下是 Spark 数据帧的示例:

+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|location_string                                                                                         |locality    |region|country|
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
|-Tainan City-Tainan, Taiwan                                                                             |Tainan City |null  |TWN    |
|093 Cicero, IL                                                                                          |null        |null  |null   |
|1005 US 98 Bypass Suite 7 Columbia, MS 39429                                                            |null        |null  |null   |
|10210  Baltimore Avenue, College Park, MD, US 20740                                                     |College Park|MD    |null   |
|12 Braintree - Braintree, MA, 02184                                                                     |null        |null  |null   |
|1215 E.Main St. #1074 Carbondale, IL 62901,                                                             |null        |null  |null   |
|18 Fairview Heights - Fairview Heights, IL, 62208                                                       |null        |null  |null   |
|21000 Hayden Dr, Woodhaven, MI, US 48183                                                                |null        |null  |null   |
|2257 N. Germantown Pkwy in Cordova, TN                                                                  |null        |null  |null   |
|2335 S. Towne Ave., Pomona, CA, US 91766                                                                |Pomona      |CA    |null   |
|2976-Taylor Ave & Harford Rd (Parkville Shopping Center, Parkville, MARYLAND, UNITED STATES             |null        |null  |null   |
|3342 Southwest Military Drive, Texas3342 Southwest Military Drive, San Antonio, TX, 78211, United States|null        |null  |null   |
|444 Cedar St., Suite 201, St. Paul, MN, US 55101                                                        |St. Paul    |MN    |null   |
|4604 Lowe Road, Louisville, KY, US 40220                                                                |Louisville  |KY    |null   |
|4691 Springboro Pike, Moraine, OH, US 45439                                                             |null        |null  |null   |
|50 Hwy 79 Bypass N Ste K Magnolia, AR 71753                                                             |null        |null  |null   |
|5188 Commerce Dr., Baldwin Park, CA, US 91706                                                           |Baldwin Park|CA    |null   |
|55445                                                                                                   |null        |null  |null   |
|5695 Harvey St, Muskegon, MI 49444                                                                      |null        |null  |null   |
|6464 Downing Street, Denver, CO, US 80229                                                               |null        |null  |null   |
+--------------------------------------------------------------------------------------------------------+------------+------+-------+
Run Code Online (Sandbox Code Playgroud)

为了解决这个问题,我有一个这样的功能:

def geocoder_decompose_location(location_string):
    if not location_string:
        return Row('nation', 'state', 'city')(None, None, None)
    
    GOOGLE_GEOCODE_API_KEYS = [key1, key2, key3]
    
    GOOGLE_GEOCODE_API_KEY = random.choice(GOOGLE_GEOCODE_API_KEYS)
    
    attempts = 0
    success = False
    while status != True and attempts < 5:
        result = geocoder.google(location_string, key=GOOGLE_GEOCODE_API_KEY)
        attempts += 1
        status = result.status
        if status == 'OVER_QUERY_LIMIT':
            time.sleep(2)
            
            # retry
            continue
        
        success = True
    
    if attempts == 5:
        print('Daily Limit Reached')
        
    return Row('nation', 'state', 'city')(result.country, result.state, result.city)
Run Code Online (Sandbox Code Playgroud)

但它似乎并没有像预期的那样处理 spark 数据框。任何指导将不胜感激!

And*_*ong 2

解决这个问题最简单的方法是用指数回退代替睡眠函数。使用...

time.sleep(Math.exp(尝试)

这会将您的读取速率降低到限制限制以下。您还可以通过添加 .coalesce 或 .repartition(max_parallelism) 来控制 Spark 最大并行度