war*_*m06 2 python apache-spark pyspark
如何更新pyspark.sql.Row对象中的值?
from pyspark.sql import Row
Record = Row('first','last')
start_row = Record('james','smith')
print(f"Sarting Row Object: {start_row}")
updated_row = start_row.first = 'john'
Run Code Online (Sandbox Code Playgroud)
给出一个例外:
Exception Traceback (most recent call last)
<command-4099832519586966> in <module>
4 start_row = Record('james','smith')
5 print(f"Sarting Row Object: {start_row}")
----> 6 updated_row = start_row.first = 'john'
/databricks/spark/python/pyspark/sql/types.py in __setattr__(self, key, value)
1578 def __setattr__(self, key, value):
1579 if key != '__fields__':
-> 1580 raise Exception("Row is read-only")
1581 self.__dict__[key] = value
1582
Exception: Row is read-only
Run Code Online (Sandbox Code Playgroud)
我理解Row是只读的。这是我想出的一个解决方案。
from pyspark.sql import Row
Record = Row('first','last')
start_row = Record('james','smith')
print(f"Sarting Row Object: {start_row}")
def update_spark_row(row,update):
"""pyspar.sql.Row is immutable. Have not found an elegant way to update pyspark.sql.Row objects."""
row_as_dict = row.asDict() # convert to dict
row_as_dict[update[0]] = update[1] # make update in dict
keys = list(row_as_dict.keys()) # get dict keys
values = list(row_as_dict.values()) # get dict values
NewRow = Row(*keys) # create new row object
new_row = NewRow(*values) # populate row object with values
return new_row
end_row = update_spark_row(rec1,('first','jimmy'))
print(f"Ending Row Object: {end_row}")
Run Code Online (Sandbox Code Playgroud)
给出期望的结果:
Sarting Row Object: Row(first='james', last='smith')
Ending Row Object: Row(first='jimmy', last='smith')
Run Code Online (Sandbox Code Playgroud)
这个片段有效,但我觉得应该有一个优雅的解决方案。我不想创建 DataFrame。或者我可以使用namedtupleordataclass但由于我使用的是 PySpark 我想使用Row.
使用案例:
我有几个 Spark 作业需要将数据写入审核日志表。对 Spark DF 或表的并发写入(更新)是不可行的。我的计划是让每个作业跟踪自己的Row对象,然后在所有作业完成后将它们附加到表中作为最后一步。追加表可以是并发的。
写完这篇文章后,我想我可以使用 DataFrame 但我想知道是否有一种方法可以使用Row. namedtuple更改or中的数据dataclass既简单又可读。编辑每个 DF 有点冗长。我想这是程序员固执的一个例子。
你走在正确的轨道上。首先将 Row 转换为 dict,然后重新创建 Row 对象。但更新可以更简单:
一行选项:
updated_row = Row(**{**start_row.asDict(), **{'first': 'john'}})
print(updated_row)
# Row(first='john', last='smith')
Run Code Online (Sandbox Code Playgroud)
选项二:
d = start_row.asDict()
d.update({'first': 'john'})
updated_row = Row(**d)
print(updated_row)
# Row(first='john', last='smith')
Run Code Online (Sandbox Code Playgroud)