Jie*_*eng 5 amazon-web-services parquet pyspark aws-glue
如何验证我的书签是否有效?我发现当我在上一个完成后立即运行一个作业时,它似乎仍然需要很长时间。这是为什么?我以为它不会读取它已经处理过的文件?脚本如下所示:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://xxx-glue/testing-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
if bool(inputGDF.toDF().head(1)):
print("Writing ...")
inputGDF.toDF() \
.drop("createdat") \
.drop("updatedat") \
.write \
.mode("append") \
.partitionBy(["querydestinationplace", "querydatetime"]) \
.parquet("s3://xxx-glue/testing-parquet")
else:
print("Nothing to write ...")
job.commit()
import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='xxx-testing-partitioned')
Run Code Online (Sandbox Code Playgroud)
看起来像:
18/12/11 14:49:03 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:03 DEBUG Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
18/12/11 14:49:04 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:04 DEBUG Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
18/12/11 14:49:05 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:05 DEBUG Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
...
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-15_2018-11-19.csv:0+1194081
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-14_2018-11-18.csv' for reading
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-15_2018-11-19.csv' for reading
18/12/11 14:42:00 INFO Executor: Finished task 89.0 in stage 0.0 (TID 89). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 92
18/12/11 14:42:00 INFO Executor: Running task 92.0 in stage 0.0 (TID 92)
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-16_2018-11-20.csv:0+1137753
18/12/11 14:42:00 INFO Executor: Finished task 88.0 in stage 0.0 (TID 88). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 93
18/12/11 14:42:00 INFO Executor: Running task 93.0 in stage 0.0 (TID 93)
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-17_2018-11-21.csv:0+1346626
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-16_2018-11-20.csv' for reading
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-17_2018-11-21.csv' for reading
18/12/11 14:42:00 INFO Executor: Finished task 90.0 in stage 0.0 (TID 90). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO Executor: Finished task 91.0 in stage 0.0 (TID 91). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 94
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 95
18/12/11 14:42:00 INFO Executor: Running task 95.0 in stage 0.0 (TID 95)
18/12/11 14:42:00 INFO Executor: Running task 94.0 in stage 0.0 (TID 94)
Run Code Online (Sandbox Code Playgroud)
...我注意到镶木地板附加了大量重复数据...书签不起作用吗?它已经启用
Dav*_*vos 12
从文档
必须使用--job-bookmark-option job-bookmark-enable(或者如果使用控制台然后在控制台选项中创建作业)。作业还必须有作业名称;这将自动传入。
作业必须以Job.init(jobname)
eg开头
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
Run Code Online (Sandbox Code Playgroud)
作业必须有一个Job.commit()来保存书签的状态并成功完成。
数据源必须是 s3 源或 JDBC(有限,而不是您的用例,所以我将忽略它)。
文档中的示例显示使用表名而不是显式 S3 路径从(胶水/湖地层)目录创建动态框架。这意味着从目录中读取仍被视为 S3 源;底层文件将在 S3 上。
s3 上的文件必须是 JSON、CSV、Apache Avro、0.9 及以上版本的 XML 之一,或者可以是 Parquet 或 ORC 版本 1.0 及以上
脚本中的数据源必须有一个transformation_ctx参数。
文档说
仅将transformation_ctx参数传递给您想要启用书签的那些方法您可以将此添加到每个转换以保存状态,但关键的是您想要添加书签的数据源。
从文档
job.comit()并使用了transformation_ctx如上对于 Amazon S3 输入源,作业书签会检查对象的上次修改 时间,而不是文件名,以验证哪些对象需要重新处理。如果您的输入源数据自上次运行作业以来已被修改,则在您再次运行作业时会重新处理这些文件。
您是否已确认路径中的 CSV 文件"s3://xxx-glue/testing-csv"不包含重复项?您可以使用 Glue 爬虫或在 Athena 中编写 DDL 在它们上面创建一个表并直接查看。或者,创建一个开发端点并运行 zeppelin 或 sagemaker notebook 并逐步执行您的代码。
它没有提到编辑您的脚本会重置您的状态的任何地方,但是,如果您修改transformation_ctx了数据源或其他阶段的状态,那么这可能会影响状态,但是我还没有验证这一点。该作业具有Jobname状态键,以及用于管理重试和最新状态的运行号、尝试号和版本号,这意味着只要Jobname是一致,但我还没有验证过。
顺便说一句,在您的代码中,您测试inputGDF.toDF().head(1)然后运行inputGDF.toDF()...以写入数据。Spark 被懒惰地评估,但在这种情况下,您将两次运行与数据帧等效的动态帧,并且 Spark 无法缓存或重用它。最好在df = inputGDF.toDF()之前做一些事情if,然后重复使用df两次。
| 归档时间: |
|
| 查看次数: |
6849 次 |
| 最近记录: |