我有一个独立的 Spark 集群,它从 kafka 队列中读取数据。kafka 队列有 5 个分区,spark 只处理来自其中一个分区的数据。我正在使用以下内容:
这是我的 Maven 依赖项:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>kafka-custom</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我的 kafka 生产者是一个简单的生产者,它只是将 100 条消息放在队列中:
public void generateMessages() {
// Define the properties for the Kafka Connection
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBrokerServer); // kafka server
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
// Create a KafkaProducer using the Kafka Connection properties
KafkaProducer<String, String> …Run Code Online (Sandbox Code Playgroud)