Kal*_*yan 2 apache-spark apache-spark-sql pyspark udf pyspark-sql
在我的项目中间,我遇到了这个不受支持的操作异常。这是我的场景,我创建了一个名为 filter 的 udf 并将其注册为 fnGetChargeInd。此函数采用 4 个参数,一个 unicode 时间戳,该时间戳已从查询格式化为日期时间类型、字符串频率、字符串 begmonth 和字符串 currperiod。通过它计算chargeAmt并返回一个整数类型的值。这是我的udf函数代码
def filter(startdate, frequency, begmonth, testperiod):
startdatestring = startdate.strftime("%Y-%m-%d")
# print "startdatestring->", startdatestring
startdateyearstring = startdatestring[0:4]
startdatemonthstring = startdatestring[5:7]
# print "startdateyearstring->", startdateyearstring
startdateyearint = int(startdateyearstring)
startdatemonthint = int(startdatemonthstring)
# print "startdateyearint is->", startdateyearint
# print "startdateyearinttype", type(startdateyearint)
currYear = startdateyearint
currMonth = startdatemonthint
currperiod = startdateyearstring + startdatemonthstring
if (frequency == 'M'):
return 1
if (frequency == 'S' or frequency == 'A' and begmonth != None):
currMonth = int(begmonth)
print"in if statement", currMonth
# check nextperiod calculation
if (currperiod == testperiod):
return 1
if (currperiod > testperiod):
return 0
if (frequency == 'Q'):
currMonth = currMonth + 3
if (frequency == 'S'):
currMonth = currMonth + 1
if (currMonth > 12):
currMonth = currMonth - 12
currYear = currYear + 1
return 0
Run Code Online (Sandbox Code Playgroud)
这是我的 TimestampConversion 代码,用于将 unicode 格式化为日期时间
def StringtoTimestamp(datetext):
if(datetext==None):
return None
else:
datevalue = datetime.datetime.strptime(datetext, "%b %d %Y %H:%M:%S:%f%p")
return datevalue
Run Code Online (Sandbox Code Playgroud)
spark.udf.register('TimestampConvert',lambda datetext:StringtoTimestamp(datetext),TimestampType())
spark.udf.register("fnGetChargeInd",lambda x,y,z,timeperiod:filter(x,y,z,timeperiod),IntegerType())
Run Code Online (Sandbox Code Playgroud)
现在在此之后我查询了chargeAmt计算表
spark.sql("select b.ENTITYID as ENTITYID, cm.BLDGID as BldgID,cm.LEASID as LeaseID,coalesce(l.SUITID,(select EmptyDefault from EmptyDefault)) as SuiteID,(select CurrDate from CurrDate) as TxnDate,cm.INCCAT as IncomeCat,'??' as SourceCode,(Select CurrPeriod from CurrPeriod)as Period,coalesce(case when cm.DEPARTMENT ='@' then 'null' else cm.DEPARTMENT end, null) as Dept,'Lease' as ActualProjected ,fnGetChargeInd(TimestampConvert(cm.EFFDATE),cm.FRQUENCY,cm.BEGMONTH,('select CurrPeriod from CurrPeriod'))*coalesce (cm.AMOUNT,0) as ChargeAmt,0 as OpenAmt,cm.CURRCODE as CurrencyCode,case when ('PERIOD.DATACLSD') is null then 'Open' else 'Closed' end as GLClosedStatus,'Unposted'as GLPostedStatus ,'Unpaid' as PaidStatus,cm.FRQUENCY as Frequency,0 as RetroPD from CMRECC cm join BLDG b on cm.BLDGID =b.BLDGID join LEAS l on cm.BLDGID =l.BLDGID and cm.LEASID =l.LEASID and (l.VACATE is null or l.VACATE >= ('select CurrDate from CurrDate')) and (l.EXPIR >= ('select CurrDate from CurrDate') or l.EXPIR < ('select RunDate from RunDate')) left outer join PERIOD on b.ENTITYID = PERIOD.ENTITYID and ('select CurrPeriod from CurrPeriod')=PERIOD.PERIOD where ('select CurrDate from CurrDate')>=cm.EFFDATE and (select CurrDate from CurrDate) <= coalesce(cm.EFFDATE,cast(date_add(( select min(cm2.EFFDATE) from CMRECC cm2 where cm2.BLDGID = cm.BLDGID and cm2.LEASID = cm.LEASID and cm2.INCCAT = cm.INCCAT and 'cm2.EFFDATE' > 'cm.EFFDATE'),-1) as timestamp) ,case when l.EXPIR <(select RunDate from RunDate)then (Select RunDate from RunDate) else l.EXPIR end)").show()
Run Code Online (Sandbox Code Playgroud)
我现在将此结果保存在 Fact_Temp 临时表中 问题出现 我想查询一个过滤表,在删除 ActualProjected=Lease 和 ChargeAmt=0 的行后我将获取数据
spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease' and ChargeAmt='0')").show()
Run Code Online (Sandbox Code Playgroud)
它给了我例外
java.lang.UnsupportedOperationException:无法计算表达式:fnGetChargeInd(TimestampConvert(input[0, string, true]), input[1, string, true], input[2, string, true], select CurrPeriod from CurrPeriod)
我发现如果我在没有这种条件的情况下进行查询,chargeAmt 不会产生任何价值原因,它运行良好
spark.sql("select * from Fact_Temp except(select * from Fact_Temp where ActualProjected='Lease')").show()
Run Code Online (Sandbox Code Playgroud)
这给了我预期的 EmptyTable.Logically 我认为计算后在表中设置了 ChargeAMt 值,并且我已经注册了该表,因此值被保存。所以当我在保存的表上查询时。我不知道为什么函数在这里调用。我已经在 stackoverflow 中看到了这篇文章 UnsupportedOperationException: Cannot evalute expression: .. 当添加新列 withColumn() 和 udf()以供理解但虽然我的情况在这里不同。我试过数据框打印模式我只看到了这个临时表的模式
我怎样才能解决这个问题,任何指导都受到高度赞赏。我在这里的代码中遗漏了什么。请帮助我。我正在使用 Pyspark 2.0 提前感谢 Kalyan
好的,到目前为止我已经发现这是 spark 2.0 错误。以下链接解决了我的问题 https://issues.apache.org/jira/browse/SPARK-17100
我已经从 2.0 转移到 2.1.0,它对我有用。
| 归档时间: |
|
| 查看次数: |
6652 次 |
| 最近记录: |