小编tan*_*oup的帖子

Spark结构化流写入实木复合地板会创建许多文件

我使用结构化流从kafka加载消息,进行一些聚合,然后将其写入镶木地板文件。问题是,仅从kafka发送的100条消息就创建了太多的实木复合地板文件(800个文件)。

聚合部分是:

return model
            .withColumn("timeStamp", col("timeStamp").cast("timestamp"))
            .withWatermark("timeStamp", "30 seconds")
            .groupBy(window(col("timeStamp"), "5 minutes"))
            .agg(
                count("*").alias("total"));
Run Code Online (Sandbox Code Playgroud)

查询:

StreamingQuery query = result //.orderBy("window")
            .writeStream()
            .outputMode(OutputMode.Append())
            .format("parquet")
            .option("checkpointLocation", "c:\\bigdata\\checkpoints")
            .start("c:\\bigdata\\parquet");
Run Code Online (Sandbox Code Playgroud)

当使用spark加载一个实木复合地板文件时,它显示为空

+------+-----+
|window|total|
+------+-----+
+------+-----+
Run Code Online (Sandbox Code Playgroud)

如何将数据集仅保存到一个实木复合地板文件中?谢谢

apache-spark parquet spark-streaming

5
推荐指数
1
解决办法
893
查看次数

Spark Structured Stream仅从Kafka的一个分区获取消息

我得到了这样的情况,当spark可以流式传输并从Kafka 2-patition主题的一个分区获取消息.

我的主题: C:\bigdata\kafka_2.11-0.10.1.1\bin\windows>kafka-topics --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test4

卡夫卡制片人:

public class KafkaFileProducer {

// kafka producer
Producer<String, String> producer;

public KafkaFileProducer() {

    // configs
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    //props.put("group.id", "testgroup");
    props.put("batch.size", "16384");
    props.put("auto.commit.interval.ms", "1000");
    props.put("linger.ms", "0");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("block.on.buffer.full", "true");

    // instantiate a producer
    producer = new KafkaProducer<String, String>(props);
}

/**
 * @param filePath
 */
public void sendFile(String filePath) {
    FileInputStream fis;
    BufferedReader br = null;

    try { …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-structured-streaming

2
推荐指数
1
解决办法
1098
查看次数