我使用结构化流从kafka加载消息,进行一些聚合,然后将其写入镶木地板文件。问题是,仅从kafka发送的100条消息就创建了太多的实木复合地板文件(800个文件)。
聚合部分是:
return model
.withColumn("timeStamp", col("timeStamp").cast("timestamp"))
.withWatermark("timeStamp", "30 seconds")
.groupBy(window(col("timeStamp"), "5 minutes"))
.agg(
count("*").alias("total"));
Run Code Online (Sandbox Code Playgroud)
查询:
StreamingQuery query = result //.orderBy("window")
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("checkpointLocation", "c:\\bigdata\\checkpoints")
.start("c:\\bigdata\\parquet");
Run Code Online (Sandbox Code Playgroud)
当使用spark加载一个实木复合地板文件时,它显示为空
+------+-----+
|window|total|
+------+-----+
+------+-----+
Run Code Online (Sandbox Code Playgroud)
如何将数据集仅保存到一个实木复合地板文件中?谢谢
我得到了这样的情况,当spark可以流式传输并从Kafka 2-patition主题的一个分区获取消息.
我的主题:
C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4
卡夫卡制片人:
public class KafkaFileProducer {
// kafka producer
Producer<String, String> producer;
public KafkaFileProducer() {
// configs
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("group.id", "testgroup");
props.put("batch.size", "16384");
props.put("auto.commit.interval.ms", "1000");
props.put("linger.ms", "0");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("block.on.buffer.full", "true");
// instantiate a producer
producer = new KafkaProducer<String, String>(props);
}
/**
* @param filePath
*/
public void sendFile(String filePath) {
FileInputStream fis;
BufferedReader br = null;
try { …Run Code Online (Sandbox Code Playgroud)