什么是最优的没有.具有5个代理和复制因子= 3的主题的分区?此群集中只有6个主题.
每个代理运行的计算机配置如下.
Memory=16gb
Porcessor= Octacore, Intel(R) Xeon(R) CPU E5-2650 v3 @ 2.30GHz
Run Code Online (Sandbox Code Playgroud)
除了每个主题1000个分区的打开文件以外的任何挑战?
我正在使用带有kafka主题的火花流。主题创建有5个分区。我的所有消息都使用表名作为键发布到kafka主题。鉴于此,我假设该表的所有消息都应转到同一分区。但我注意到在同一张表的Spark日志消息中,有时会到达执行者的node-1,有时会到达执行者的node-2。
我正在使用以下命令在yarn-cluster模式下运行代码:
spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar
Run Code Online (Sandbox Code Playgroud)
该提交创建了一个驱动程序,可以在node-1上说,在node-1和node-2上可以说两个执行程序。
我不希望节点1和节点2执行程序读取相同的分区。但这正在发生
还尝试了以下配置以指定使用者组,但没有区别。
kafkaParams.put("group.id", "app1");
Run Code Online (Sandbox Code Playgroud)
这就是我们使用createDirectStream方法创建流的方式*不是通过zookeeper。
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Run Code Online (Sandbox Code Playgroud)
完整代码:
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import …Run Code Online (Sandbox Code Playgroud) java apache-kafka apache-spark spark-streaming kafka-consumer-api
如何使用直接流API为kafka spark流指定使用者组ID.
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("group.id", "app1");
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
Run Code Online (Sandbox Code Playgroud)
虽然我已经指定配置不确定是否遗漏了什么.使用spark1.3
kafkaParams.put("group.id", "app1");
Run Code Online (Sandbox Code Playgroud) java apache-kafka apache-spark spark-streaming kafka-consumer-api