x89*_*x89 1 python pandas pyspark palantir-foundry foundry-code-repositories
在 palantir Foundry 中,我试图从数据集中读取所有 xml 文件。然后,在 for 循环中,我解析 xml 文件。
直到倒数第二行,代码运行良好,没有错误。
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from bs4 import BeautifulSoup
import pandas as pd
import lxml
@transform(
output=Output("/Spring/xx/datasets/mydataset2"),
source_df=Input("ri.foundry.main.dataset.123"),
)
def read_xml(ctx, source_df, output):
df = pd.DataFrame()
filesystem = source_df.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
for i in files:
with open(i, 'r') as f:
file = f.read()
soup = BeautifulSoup(file,'xml')
data = []
for e in soup.select('offer'):
data.append({
'meldezeitraum': e.find_previous('data').get('meldezeitraum'),
'id':e.get('id'),
'parent_id':e.get('parent_id'),
})
df = df.append(data)
output.write_dataframe(sanitize_schema_for_parquet(df))
Run Code Online (Sandbox Code Playgroud)
但是,一旦我添加最后一行:
output.write_dataframe(sanitize_schema_for_parquet(df))
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
Missing transform attribute
A DataFrame object does not have an attribute select. Please check the spelling and/or the datatype of the object.
/transforms-python/src/myproject/datasets/mydataset.py
output.write_dataframe(sanitize_schema_for_parquet(df))
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?
小智 9
您必须将 pandas DataFrame 转换为 Spark DataFrame。尽管它们具有相同的名称,但在 python 中它们是两种不同的对象类型。
最简单的方法是
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame(df)
Run Code Online (Sandbox Code Playgroud)
然后您可以将 传递spark_df
给该output.write_dataframe()
函数
归档时间: |
|
查看次数: |
721 次 |
最近记录: |