小编Fai*_*qui的帖子

5个代理中kafka主题的最佳分区数,在1个集群中复制因子= 3

什么是最优的没有.具有5个代理和复制因子= 3的主题的分区?此群集中只有6个主题.

每个代理运行的计算机配置如下.

Memory=16gb
Porcessor= Octacore, Intel(R) Xeon(R) CPU E5-2650 v3 @ 2.30GHz
Run Code Online (Sandbox Code Playgroud)

除了每个主题1000个分区的打开文件以外的任何挑战?

apache-kafka

7
推荐指数
1
解决办法
7681
查看次数

Kafka主题分区和Spark执行程序映射

我正在使用带有kafka主题的火花流。主题创建有5个分区。我的所有消息都使用表名作为键发布到kafka主题。鉴于此,我假设该表的所有消息都应转到同一分区。但我注意到在同一张表的Spark日志消息中,有时会到达执行者的node-1,有时会到达执行者的node-2。

我正在使用以下命令在yarn-cluster模式下运行代码:

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar
Run Code Online (Sandbox Code Playgroud)

该提交创建了一个驱动程序,可以在node-1上说,在node-1和node-2上可以说两个执行程序。

我不希望节点1和节点2执行程序读取相同的分区。但这正在发生

还尝试了以下配置以指定使用者组,但没有区别。

kafkaParams.put("group.id", "app1");
Run Code Online (Sandbox Code Playgroud)

这就是我们使用createDirectStream方法创建流的方式*不是通过zookeeper。

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);
    kafkaParams.put("auto.offset.reset", "largest");
    kafkaParams.put("group.id", "app1");

        JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc, 
                String.class, 
                String.class,
                StringDecoder.class, 
                StringDecoder.class, 
                kafkaParams, 
                topicsSet
        );
Run Code Online (Sandbox Code Playgroud)

完整代码:

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-spark spark-streaming kafka-consumer-api

5
推荐指数
1
解决办法
4288
查看次数

如何使用直接流在Kafka Spark Streaming中指定使用者组

如何使用直接流API为kafka spark流指定使用者组ID.

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc, 
            String.class, 
            String.class,
            StringDecoder.class, 
            StringDecoder.class, 
            kafkaParams, 
            topicsSet
    );
Run Code Online (Sandbox Code Playgroud)

虽然我已经指定配置不确定是否遗漏了什么.使用spark1.3

kafkaParams.put("group.id", "app1");
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-spark spark-streaming kafka-consumer-api

4
推荐指数
2
解决办法
6522
查看次数