Meg*_*gan 5 apache-spark pyspark
我正在将 Spark 数据帧写入 bigquery 表。这是可行的,但现在我在将数据写入 bigquery 之前调用 pandas udf。由于某种原因,当我在将 Spark 数据帧写入 bigquery 之前调用 pandas udf 时,我现在看到以下错误:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 287, in dump_stream
batch = _create_batch(series, self._timezone)
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in _create_batch
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 256, in <listcomp>
arrs = [create_array(s, t) for s, t in series]
File "/mnt1/yarn/usercache/hadoop/appcache/application_1579619644892_0001/container_1579619644892_0001_01_000002/pyspark.zip/pyspark/serializers.py", line 240, in create_array
return pa.Array.from_pandas(s, mask=mask).cast(t, safe=False)
File "pyarrow/array.pxi", line 474, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 169, in pyarrow.lib.array
File "pyarrow/array.pxi", line 69, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: an integer is required (got type Timestamp)
Run Code Online (Sandbox Code Playgroud)
从下面的执行程序日志来看,这似乎是由不正确的镶木地板模式引起的,其中时间戳列被推断为整数?
20/01/20 22:45:38 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "firstname",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "status",
"type" : "string",
"nullable" : true,
"metadata" : { }
}, {
"name" : "entry_date",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
}, {
"name" : "last_status_date",
"type" : "timestamp",
"nullable" : true,
"metadata" : { }
} ]
}
and corresponding Parquet message type:
message spark_schema {
optional binary id (UTF8);
optional binary firstname (UTF8);
optional binary status (UTF8);
optional int96 entry_date;
optional int96 last_status_date;
}
Run Code Online (Sandbox Code Playgroud)
这很令人困惑,因为当我在不应用 pandas_udf 的情况下运行代码时,不会发生这种情况。udf 不会以任何方式操作日期列......
def main():
# apply pandas udf
df.groupBy('firstname').apply(my_pandas_udf)
# drop some columns
cols_to_drop = ['firstname']
# save to bigquery
df \
.drop(*cols_to_drop) \
.write \
.format("bigquery") \
.option("temporaryGcsBucket", "<TEMP_BUCKET_NAME>") \
.option("project", "PROJECT_ID") \
.option("credentialsFile","/path/to/my/credentials.json") \
.option("parentProject", "PROJECT_ID") \
.option("table", "PROJECT_ID:dataset.table") \
.mode("overwrite") \
.save()
def udf_schema():
return StructType([
StructField('id', StringType(), True),
StructField('firstname', StringType(), True),
StructField('status', StringType(), True),
StructField('entry_date', TimestampType(), True),
StructField('last_status_date', TimestampType(), True),
])
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?这个stackoverflow 帖子似乎有类似的问题,但截至 2020 年 1 月 21 日尚未得到答复。
编辑(1):pandas_udf之前和之后的数据帧数据类型 从pandas_udf返回时会发生错误,但这里是spark数据帧在传递给pandas_udf之前的数据类型
==> BEFORE
id string
firstname string
status string
entry_date timestamp
date_status_change timestamp
last_status_date timestamp
Run Code Online (Sandbox Code Playgroud)
小智 4
我最近遇到了类似的问题,我相信该错误是由于 pandas 在object
从 Spark 读取时将每一列强制转换而发生的。
我解决问题的方法是在创建 pandas 数据帧后显式转换时间戳列。所以就你的情况而言,类似:
@pandas_udf(udf_schema(), PandasUDFType.GROUPED_MAP)
def my_pandas_udf(df):
df['entry_date'] = pd.to_datetime(df['entry_date'])
df['last_status_date'] = pd.to_datetime(df['last_status_date'])
df = df.sort_values('entry_date', ascending=False)
oldest_date = df['entry_date'].iloc[0]
df = df[df['entry_date'] >= oldest_date]
df = df.copy()
return df
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
8896 次 |
最近记录: |