use*_*922 7 amazon-emr python-requests apache-spark apache-spark-sql pyspark
我有一个 pyspark 作业,在本地运行时运行没有任何问题,但是当它从 aws 集群运行时,它会在到达以下代码时卡住。该作业仅处理 100 条记录。“some_function”将数据发布到网站并最终返回响应。知道出了什么问题或者我如何调试这个吗?仅供参考:“Some_function”在类之外,我猜问题与 [“closures”][1] 有关,但不确定如何修复它
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
Run Code Online (Sandbox Code Playgroud)
完整代码如下
def ctgs(entries):
col1 = entries[0]
col2 = entries[1]
col3 = entries[2]
rec = {
up_col1 : col1,
up_col2 : col2,
up_col3 : col3
}
return rec
def some_func1(rec, dict_name, id):
recs{
rec_list = list(rec)
seid = id
}
headers = "some header"
attrburl = "www.someurl.com"
response = requests.post(attrburl, data=json.dumps(rec_list)), headers)
return response
class Processor:
def __init(self, sc, arguments):
self.sc = sc
self.env = arguments.env
self.dte = arguments.dte
self.sendme = arguments.sendme
def send_them(ext_data, dict_name,id):
attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))
response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()
def extr_data(self):
ext_data=spark.sql('''select col1, col2, col3 from table_name''')
send_them(ext_data,dict_name,id)
def process(self):
dict_name = { dict_id: '34343-3434-3433-343'}
id = 'dfdfd-erere-dfd'
extr_data()
def argument_parsing(args):
parser.add_argument("--env", required=True)
parser.add_argument("--dte", required=True)
parser.add_argument("--sendme", required=False)
args = parser.parse_args(args)
return args
def main(args):
arguments = argument_parsing(args)
sc = SparkSession \
.builder \
.appName("job_name") \
.enableHiveSupport() \
.getOrCreate()
sc.sparkContext.setLogLevel("ERROR")
processor = Processor(sc, arguments)
processor.process()
Run Code Online (Sandbox Code Playgroud)
你是对的,这是关闭/执行者的问题。
当处于集群中时,mapPartitions 内的代码将在执行器上运行。运行“本地”将掩盖这些类型的错误/错误,因为它将所有函数的范围限定为在您的计算机上运行的驱动程序。在“本地”中运行不存在范围问题
处理闭包/执行器时有两种类型的问题。您的作用域变量不可序列化以及执行程序运行的环境。
环境检查应该很容易。如果您只是 ssh 登录并尝试连接,您是否可以从您的执行者之一连接到 URL。(我敢打赌,您正在等待在 DNS 中查找的 URL)。事实上,我建议您首先检查 EMR 集群的安全组并查看允许访问哪些节点。
范围更具挑战性。如果requests在全局范围内启动但不可序列化,则可能会导致问题。(您无法序列化与数据库/网站的飞行连接。)您可以在内部启动它mapPartitions,这将解决问题。问题是这通常会立即失败并且并不真正适合您所描述的问题。除非这导致 python 解释器死亡并错误地报告它正在等待,否则我认为这不是问题。
| 归档时间: |
|
| 查看次数: |
752 次 |
| 最近记录: |