我正在从数据湖商店读取 CSV 文件,因为我有多个路径,但如果任何一个路径不存在,它就会给出异常。我想避免这种期望。
d = [{'ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20'}, {'ID': '1', 'pID': 1000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20'}, {'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:34:20', 'endTime':'2018.07.03T02:40:20'}, {'ID': '2', 'pID': 2000, 'startTime':'2018.07.02T03:45:20', 'endTime':'2018.07.03T02:50:20'}]
df = spark.createDataFrame(d)
Dates = namedtuple("Dates", "startTime endTime")
def MergeAdjacentUsage(timeSets):
DatesArray = []
for times in timeSets:
DatesArray.append(Dates(startTime=times.startTime, endTime=times.endTime))
return DatesArray
MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates()))
df1=df.groupBy(['ID','pID']).agg(MergeAdjacentUsages(F.collect_list(struct('startTime','endTime'))).alias("Times"))
display(df1)
Run Code Online (Sandbox Code Playgroud)
我想要的只是将列值设置为 UDF 返回的结构数组。它给我的错误是:
类型错误:new () 恰好需要 3 个参数(给定 1 个)
() 22 return DatesArray 23 ---> 24 MergeAdjacentUsages = udf(MergeAdjacentUsage,ArrayType(Dates())) 25 26 df1=df.groupBy(['ID','pID'] 中的 TypeError Traceback (最近一次调用最后一次) …
所以我创建了一个小的 pyspark 应用程序并将其转换为一个鸡蛋。将其上传到 dbfs:/FileStore/jar/xyz.egg。在 ADF 中,我使用了 jar 活动。但是在主类名称文本框中,我很困惑要提供什么。
我的 Pycharm 应用程序有三个文件,其中两个基本上是实用程序文件,其中包含我调用的实用程序函数,主文件的内容是:
主文件
from CommonUtils import appendZeros
from sampleProgram import writedf
def main():
appendZeros('zzz')
writedf()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
现在在“主类名”文本框中指定什么?