AWS Glue 文档明确指出爬虫从源(JDBS 或 s3)抓取元数据信息并填充数据目录(创建/更新数据库和相应的表)。
但是,如果我们知道没有方案/分区更改,我们是否需要定期运行爬虫来检测源中的新数据(即 s3 上的新对象,db 表中的新行)并不清楚。
那么,是否需要在运行 ETL 作业之前运行爬虫才能获取新数据?
我在Spark 2.2.0中面临运行带有聚合和分区的结构化流的内存问题:
session
.readStream()
.schema(inputSchema)
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv("s3://test-bucket/input")
.as(Encoders.bean(TestRecord.class))
.flatMap(mf, Encoders.bean(TestRecord.class))
.dropDuplicates("testId", "testName")
.withColumn("year", functions.date_format(dataset.col("testTimestamp").cast(DataTypes.DateType), "YYYY"))
.writeStream()
.option("path", "s3://test-bucket/output")
.option("checkpointLocation", "s3://test-bucket/checkpoint")
.trigger(Trigger.ProcessingTime(60, TimeUnit.SECONDS))
.partitionBy("year")
.format("parquet")
.outputMode(OutputMode.Append())
.queryName("test-stream")
.start();
Run Code Online (Sandbox Code Playgroud)
在测试期间,我注意到每次新数据到来时最终使用的内存量都会增加,最后执行程序退出代码137:
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container marked as failed: container_1520214726510_0001_01_000003 on host: ip-10-0-1-153.us-west-2.compute.internal. Exit status: 137. Diagnostics: Container killed on request. Exit code is 137
Container exited with a non-zero exit code 137
Killed by external signal
Run Code Online (Sandbox Code Playgroud)
我已经创建了一个堆转储,并发现它使用的大部分内存org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider都是从StateStore引用的 …
在触发 Glue 作业时,我们需要将 4 个参数从 AWS Lambda 传递到 AWS Glue 作业。
response = client.start_job_run(JobName = 'my_test_Job',
Arguments = {
'--yr_partition_val': 2017,
'--mon_partition_val': 05,
'--date_partition_val': 25,
'--hour_partition_val': 07 } )
Run Code Online (Sandbox Code Playgroud)
Glue 需要捕获这 4 个参数才能在 pysparkglue 代码中进一步进行。
我尝试在胶水中使用以下内容来捕获参数:
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv,
['JOB_NAME',
'yr_partition_val',
'mon_partition_val',
'date_partition_val',
'hour_partition_val'])
Run Code Online (Sandbox Code Playgroud)
但得到的错误为:
self.error(_('argument %s is required') % name)
awsglue.utils.GlueArgumentError: argument --JobName is required
Run Code Online (Sandbox Code Playgroud)
有人可以帮忙吗?
我有一个 S3 位置s3://bucket-name/folder-name/,其中包含一个子文件夹,其名称是动态生成的,其中包含短语_Top10InvoiceIds。.csv该子文件夹由和文件组成.csv.metadata。我正在使用胶水爬行器仅爬行 csv 文件并在 Athena 中查看它们。但我无法排除这些.csv.metadata文件。我已经尝试了所有可能的正则表达式模式作为 glob 值。
我的一些尝试是:
*_Top10InvoiceIds/ *.metadata*_Top10InvoiceIds/ * .* metadata *_Top10InvoiceIds/ *. *.metadata *_Top10InvoiceIds/ * .csv.metadata ** .metadata * .metadata * .csv.metadata * /*.metadata如果有人可以帮助我找出该模式或建议另一种方法来完成相同的任务,那就太好了。
有一种方法可以通过将属性设置spark.streaming.stopGracefullyOnShutdown为 true 来正常关闭 Spark Streaming,然后使用命令终止进程kill -SIGTERM。但是我没有看到这样的选项可用于结构化流(SQLContext.scala)。
结构化流的关闭过程是否不同?或者只是还没有实施?
apache-spark spark-streaming apache-spark-sql spark-structured-streaming
有没有办法在 StepFunction 中创建选择规则来确定数组字段是否有任何元素?
当前的 StepFunction 文档没有列出任何特定于集合的比较运算符,所以我想知道是否可以在不实现单独的 lambda 来测试数组是否为空的情况下实现这一点?
我注意到Go断言不起作用,因为我期望零整数。这是代码:
var i interface{}
i = 0
i32, ok := i.(int32)
fmt.Println(ok)
fmt.Println(i32)
Run Code Online (Sandbox Code Playgroud)
输出如下:
false
0
Run Code Online (Sandbox Code Playgroud)
我找不到这种行为的解释。是虫子吗?