小编Ale*_*sky的帖子

Spark 结构化流恰好一次 - 未实现 - 重复事件

我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。

检查点在 S3 上提交:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");
Run Code Online (Sandbox Code Playgroud)

偏移量通过StreamingQueryListener以下方式提交给 Kafka :

  kafkaConsumer.commitSync(topicPartitionMap);
Run Code Online (Sandbox Code Playgroud)

应用程序启动后,它会从 Kafka 检索偏移量映射并启动流:

 reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)
Run Code Online (Sandbox Code Playgroud)

我将topic/partition/offset数据存储在 ORC 文件中。

数据包含具有精确 的事件的多个重复项topic/partition/offset

应如何配置流以实现恰好一次处理?

apache-kafka apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka

6
推荐指数
1
解决办法
1199
查看次数

AWS Glue搜寻器-分区键类型

我正在使用Spark将文件以ORC格式写入S3。还使用Athena查询此数据。

我正在使用以下分区键:

s3://bucket/company=1123/date=20190207
Run Code Online (Sandbox Code Playgroud)

一旦我执行了Glue搜寻器以在存储桶上运行,除分区键的类型外,其他所有东西都按预期工作。

爬网程序在目录中将它们配置为String类型而不是int

是否有配置来定义分区键的默认类型?

我知道以后可以手动对其进行更改,并将“抓取工具”配置设置为 Add new columns only.

amazon-s3 amazon-athena aws-glue aws-glue-data-catalog

5
推荐指数
1
解决办法
392
查看次数

Spark 1.6.1 S3 MultiObjectDeleteException

我正在使用Spark使用S3A URI将数据写入S3.
我也在利用s3-external-1.amazonaws.com端点来避免us-east1上的read-after-write最终一致性问题.

尝试将一些数据写入S3时发生以下问题(它实际上是一个移动操作):

  com.amazonaws.services.s3.model.MultiObjectDeleteException: Status Code: 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS Error Message: One or more objects could not be deleted, S3 Extended Request ID: null
    at com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1745)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.delete(S3AFileSystem.java:687)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:381)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:314)
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:151)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
    at …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark spark-streaming

3
推荐指数
2
解决办法
3658
查看次数