我正在使用Spark Structured Streaming的流查询使用以下代码将镶木地板文件写入S3:
ds.writeStream().format("parquet").outputMode(OutputMode.Append())
.option("queryName", "myStreamingQuery")
.option("checkpointLocation", "s3a://my-kafka-offset-bucket-name/")
.option("path", "s3a://my-data-output-bucket-name/")
.partitionBy("createdat")
.start();
Run Code Online (Sandbox Code Playgroud)
我在s3存储桶中获得了所需的输出,my-data-output-bucket-name但是与输出一起得到了其中的_spark_metadata文件夹。如何摆脱它?如果无法摆脱它,如何将其位置更改为其他S3存储桶?
我正在尝试了解Java Spark文档。有一节叫做“未类型化的用户定义的聚合函数”,其中有一些我无法理解的示例代码。这是代码:
package org.apache.spark.examples.sql;
// $example on:untyped_custom_aggregation$
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// $example off:untyped_custom_aggregation$
public class JavaUserDefinedUntypedAggregation {
// $example on:untyped_custom_aggregation$
public static class MyAverage extends UserDefinedAggregateFunction {
private StructType inputSchema;
private StructType bufferSchema;
public MyAverage() {
List<StructField> inputFields = new ArrayList<>();
inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
inputSchema = DataTypes.createStructType(inputFields);
List<StructField> bufferFields = new ArrayList<>();
bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
bufferSchema …Run Code Online (Sandbox Code Playgroud)