我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。
检查点在 S3 上提交:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
Run Code Online (Sandbox Code Playgroud)
偏移量通过StreamingQueryListener以下方式提交给 Kafka :
kafkaConsumer.commitSync(topicPartitionMap);
Run Code Online (Sandbox Code Playgroud)
应用程序启动后,它会从 Kafka 检索偏移量映射并启动流:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
Run Code Online (Sandbox Code Playgroud)
我将topic/partition/offset数据存储在 ORC 文件中。
数据包含具有精确 的事件的多个重复项topic/partition/offset。
应如何配置流以实现恰好一次处理?
apache-kafka apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka
我正在使用Spark将文件以ORC格式写入S3。还使用Athena查询此数据。
我正在使用以下分区键:
s3://bucket/company=1123/date=20190207
Run Code Online (Sandbox Code Playgroud)
一旦我执行了Glue搜寻器以在存储桶上运行,除分区键的类型外,其他所有东西都按预期工作。
爬网程序在目录中将它们配置为String类型而不是int
是否有配置来定义分区键的默认类型?
我知道以后可以手动对其进行更改,并将“抓取工具”配置设置为 Add new columns only.
我正在使用Spark使用S3A URI将数据写入S3.
我也在利用s3-external-1.amazonaws.com端点来避免us-east1上的read-after-write最终一致性问题.
尝试将一些数据写入S3时发生以下问题(它实际上是一个移动操作):
com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error Message: One or more objects could not be deleted, S3 Extended Request ID: null
at com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1745)
at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:687)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:381)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:314)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at …Run Code Online (Sandbox Code Playgroud)