我是scala新手,我正在尝试执行以下代码:
val SetID = udf{(c:String, d: String) =>
if( c.UpperCase.contains("EXKLUS") == true)
{d}
else {""}
}
val ParquetWithID = STG1
.withColumn("ID", SetID( col("line_item"), col("line_item_ID")))
Run Code Online (Sandbox Code Playgroud)
两列 (line_item和) 均按架构中的方式line_item_id定义。StringsSTG1
当我尝试运行代码时出现以下错误:
`org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1$$anonfun$2: (string, string) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
和 …
我在更新 Amazon Athena Tables 的 EMR 集群上运行自动 Python 作业。
直到几天前它运行良好(在 python 2.7 和 3.7 上)。这是脚本:
from pyathenajdbc import connect
import yaml
config = yaml.load(open('athena-config.yaml', 'r'))
statements = config['statements']
staging_dir = config['staging_dir']
conn = connect(s3_staging_dir=staging_dir, region_name='eu-west-1')
try:
with conn.cursor() as cursor:
for statement in statements:
cursor.execute(statement)
finally:
conn.close()
Run Code Online (Sandbox Code Playgroud)
athena-config.yaml 有一个暂存目录和一些 Athena 语句。
这是错误:
You are using pip version 9.0.3, however version 19.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Unrecognized option: -server
create_tables.py:5: YAMLLoadWarning: calling …Run Code Online (Sandbox Code Playgroud)