use*_*922 7 amazon-emr python-requests apache-spark apache-spark-sql pyspark
我有一个 pyspark 作业,在本地运行时运行没有任何问题,但是当它从 aws 集群运行时,它会在到达以下代码时卡住。该作业仅处理 100 条记录。“some_function”将数据发布到网站并最终返回响应。知道出了什么问题或者我如何调试这个吗?仅供参考:“Some_function”在类之外,我猜问题与 [“closures”][1] 有关,但不确定如何修复它
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
完整代码如下
def ctgs(entries):
    col1 = entries[0]
    col2 = entries[1]
    col3 = entries[2]
    rec = {
    up_col1 : col1,
    up_col2 : col2,
    up_col3 : col3
    }
    return rec
def some_func1(rec, dict_name, id):
recs{
    rec_list = list(rec)
    seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"
    response = requests.post(attrburl, data=json.dumps(rec_list)), headers)
    return response
class Processor:
    def __init(self, sc, arguments):
    self.sc = sc
    self.env = arguments.env
    self.dte = arguments.dte
    self.sendme = arguments.sendme
    def send_them(ext_data, dict_name,id):
        attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))
        response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
    def extr_data(self):
        ext_data=spark.sql('''select col1, col2, col3 from table_name''')
        send_them(ext_data,dict_name,id)
    def process(self):
        dict_name = { dict_id: '34343-3434-3433-343'}
        id = 'dfdfd-erere-dfd'
        extr_data()
def argument_parsing(args):
    parser.add_argument("--env", required=True)
    parser.add_argument("--dte",  required=True)
    parser.add_argument("--sendme", required=False)
    args = parser.parse_args(args)
    return args
def main(args):
        arguments = argument_parsing(args)
        sc = SparkSession \
            .builder \
            .appName("job_name") \
            .enableHiveSupport() \
            .getOrCreate()
        sc.sparkContext.setLogLevel("ERROR")
        processor = Processor(sc, arguments)
        processor.process()
你是对的,这是关闭/执行者的问题。
当处于集群中时,mapPartitions 内的代码将在执行器上运行。运行“本地”将掩盖这些类型的错误/错误,因为它将所有函数的范围限定为在您的计算机上运行的驱动程序。在“本地”中运行不存在范围问题
处理闭包/执行器时有两种类型的问题。您的作用域变量不可序列化以及执行程序运行的环境。
环境检查应该很容易。如果您只是 ssh 登录并尝试连接,您是否可以从您的执行者之一连接到 URL。(我敢打赌,您正在等待在 DNS 中查找的 URL)。事实上,我建议您首先检查 EMR 集群的安全组并查看允许访问哪些节点。
范围更具挑战性。如果requests在全局范围内启动但不可序列化,则可能会导致问题。(您无法序列化与数据库/网站的飞行连接。)您可以在内部启动它mapPartitions,这将解决问题。问题是这通常会立即失败并且并不真正适合您所描述的问题。除非这导致 python 解释器死亡并错误地报告它正在等待,否则我认为这不是问题。
| 归档时间: | 
 | 
| 查看次数: | 752 次 | 
| 最近记录: |