标签: spark-streaming-kafka

Spark Streaming - 读写Kafka主题

我正在使用Spark Streaming处理两个Kafka队列之间的数据,但我似乎找不到从Spark写Kafka的好方法.我试过这个:

input.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach {
      case x: String => {
        val props = new HashMap[String, Object]()

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        println(x)
        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("output", null, x)
        producer.send(message)
      }
    }
  )
)
Run Code Online (Sandbox Code Playgroud)

并且它按预期工作,但是为每个消息实例化一个新的KafkaProducer在真实环境中显然是不可行的,我正在尝试解决它.

我想为每个进程保留一个实例的引用,并在需要发送消息时访问它.如何从Spark Streaming写入Kafka?

scala apache-kafka apache-spark spark-streaming spark-streaming-kafka

33
推荐指数
5
解决办法
4万
查看次数

Spark流媒体+ Kafka vs Just Kafka

为什么以及何时会选择使用Kafka的Spark流媒体?

假设我有一个系统通过Kafka每秒获得数千条消息.我需要对这些消息应用一些实时分析,并将结果存储在数据库中.

我有两个选择:

  1. 创建我自己的工作人员,从Kafka读取消息,运行分析算法并将结果存储在DB中.在Docker时代,只需使用scale命令就可以轻松地在整个集群中扩展此工作程序.我只需要确保我的分区数量等于或大于我的工作者,并且一切都很好并且我有一个真正的并发性.

  2. 使用Kafka流输入创建Spark群集.让Spark集群进行分析计算,然后存储结果.

有没有第二种选择是更好的选择?听起来像是一个额外的开销.

apache-kafka apache-spark spark-streaming spark-streaming-kafka

13
推荐指数
1
解决办法
1862
查看次数

无法找到 LoginModule 类:org.apache.kafka.common.security.plain.PlainLoginModule

环境:Spark 2.3.0、Scala 2.11.12、Kafka(无论最新版本是什么)

\n\n

我有一个安全的 Kafka 系统,我正在尝试将 Spark Streaming Consumer 连接到该系统。以下是我的build.sbt文件:

\n\n
name := "kafka-streaming"\nversion := "1.0"\n\nscalaVersion := "2.11.12"\n\n// still want to be able to run in sbt\n// https://github.com/sbt/sbt-assembly#-provided-configuration\nrun in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))\n\nfork in run := true\njavaOptions in run ++= Seq(\n    "-Dlog4j.debug=true",\n    "-Dlog4j.configuration=log4j.properties")\n\nassemblyMergeStrategy in assembly := {\n    case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat\n    case PathList("META-INF", _*) => MergeStrategy.discard\n    case _ => MergeStrategy.first\n}\n\nlibraryDependencies ++= Seq(\n    "org.apache.spark" %% "spark-core" % "2.3.0",\n    "org.apache.spark" …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-structured-streaming spark-streaming-kafka

10
推荐指数
1
解决办法
5526
查看次数

Spark Structured Streaming with Kafka 不支持startingOffset="earliest"

我已经设置了 Spark Structured Streaming (Spark 2.3.2) 来读取 Kafka (2.0.0)。如果消息在 Spark 流作业开始之前进入主题,我将无法从主题的开头开始消费。Spark 流的这种预期行为是否会忽略在 Spark Stream 作业初始运行之前生成的 Kafka 消息(即使使用 .option("stratingOffsets","earliest"))?

重现步骤

  1. 在开始流式作业之前,创建test主题(单个代理、单个分区)并向该主题生成消息(在我的示例中为 3 条消息)。

  2. 使用以下命令启动 spark-shell: spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/

  3. 执行下面的 spark scala 代码。

// Local
val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("failOnDataLoss","false")
  .option("stratingOffsets","earliest")
  .option("subscribe", "test")
  .load()

// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
  .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
  .start()
Run Code Online (Sandbox Code Playgroud)

预期与实际输出

我希望流从 offset=1 开始。但是,它从 offset=3 开始读取。可以看到kafka客户端实际上是在重置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to …

apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka

8
推荐指数
1
解决办法
4083
查看次数

无法找到Kafka Producer - org.apache.kafka.common.serialization.StringSerializer

我创建了一个简单的Kafka Producer&Consumer.我使用的是kafka_2.11-0.9.0.0.这是我的制作人代码,

public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
            StringSerializer.class.getName());
    props.put("value.serializer",
            StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                topicName, Integer.toString(i), Integer.toString(i));
        System.out.println(producerRecord);
        producer.send(producerRecord);
    }

    producer.close();
}

}
Run Code Online (Sandbox Code Playgroud)

在启动捆绑ia时遇到以下错误,

2016-05-20 09:44:57,792 | …
Run Code Online (Sandbox Code Playgroud)

apache-karaf apache-kafka apache-spark spark-streaming-kafka

7
推荐指数
3
解决办法
1万
查看次数

Spark Streaming Kafka流

我试图用kafka读取火花流时遇到一些问题.

我的代码是:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "consumergroup",
  "metadata.broker.list" -> "localhost:9092",
  "zookeeper.connection.timeout.ms" -> "10000"
  //"kafka.auto.offset.reset" -> "smallest"
)

val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
Run Code Online (Sandbox Code Playgroud)

我之前在端口2181启动了zookeeper,在端口9092启动了Kafka服务器0.9.0.0.但是我在Spark驱动程序中遇到以下错误:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
Run Code Online (Sandbox Code Playgroud)

Zookeeper日志:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming spark-streaming-kafka

6
推荐指数
1
解决办法
7181
查看次数

Spark 结构化流恰好一次 - 未实现 - 重复事件

我正在使用 Spark Structured Streaming 来使用来自 Kafka 的事件并将它们上传到 S3。

检查点在 S3 上提交:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");
Run Code Online (Sandbox Code Playgroud)

偏移量通过StreamingQueryListener以下方式提交给 Kafka :

  kafkaConsumer.commitSync(topicPartitionMap);
Run Code Online (Sandbox Code Playgroud)

应用程序启动后,它会从 Kafka 检索偏移量映射并启动流:

 reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)
Run Code Online (Sandbox Code Playgroud)

我将topic/partition/offset数据存储在 ORC 文件中。

数据包含具有精确 的事件的多个重复项topic/partition/offset

应如何配置流以实现恰好一次处理?

apache-kafka apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka

6
推荐指数
1
解决办法
1199
查看次数

Spark Streaming与Kafka 2.0.0依赖项

我正在尝试使用spark-streaming2.0.0来使用kafka 0.8主题,我正在尝试识别我在build.sbt文件中尝试使用这些依赖项所需的依赖项

libraryDependencies += "org.apache.spark" %% "spark-streaming_2.11" % "2.0.0"
Run Code Online (Sandbox Code Playgroud)

当我运行sbt包时,我得到所有这三个罐子的未解决的依赖关系,

但这些罐子确实存在

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.0.0

请帮忙调试这个问题,我是Scala的新手,所以如果我没有做对,请告诉我

sbt apache-spark spark-streaming spark-streaming-kafka

5
推荐指数
1
解决办法
3150
查看次数

从spark流中的kafka消息中提取时间戳?

尝试从卡夫卡源读取。我想从收到的消息中提取时间戳以进行结构化火花流处理。kafka(版本 0.10.0.0) Spark Streaming(版本 2.0.1)

apache-kafka spark-streaming spark-streaming-kafka

5
推荐指数
1
解决办法
5010
查看次数

增加 Kafka Streams 消费者吞吐量

我有一个并排运行的 Spark Streaming 应用程序和一个 Kafka Streams 应用程序,用于基准测试。两者都使用相同的输入主题并写入不同的目标数据库。输入主题有 15 个分区,spark 流和 kafka 流都有 15 个消费者(1:1 的比例)。此外,事件有效负载约为 2kb。不确定它是否相关,但 Spark Streaming 的 90% 百分位执行时间约为 9 毫秒。卡夫卡流,12 毫秒。每次处理消息时,都会在我的处理器中调用 commit() 方法。

问题依赖于高爆发。Spark Streaming 可以跟上每秒 700 次,而 Kafka Streams 只能跟上每秒 60/70 次。我不能超越那个。见下图:(绿线 - Spark Streaming / 蓝线 - Kafka Streams)

绿线 - Spark Streaming / 蓝线 - Kafka Streams

根据下面的配置,只要每个消费者不超过 1000 个事件,考虑到背压,火花流可以跟上,无论每个分区的字节数如何。至于 Kafka Streams,如果我正确理解了它的配置(请保持诚实),基于下面的相同,我能够每 100 毫秒(poll.ms)获取最多 1000 条记录(max.poll.records),只要每个分区不超过 1MB (max.partition.fetch.bytes) 和每次提取不超过 50MB (fetch.max.bytes)。

我看到相同的结果(每秒停留在 70 个事件上),无论我使用的是 5、10 还是 15 个消费者,这让我认为它与配置有关。我试图通过增加每次获取的记录数和每个分区的最大字节数来调整这些,但我没有得到显着的结果。

我知道这些是不同的技术并用于不同的目的,但我想知道我应该在 Kafka Streams 中使用哪些值以获得更好的吞吐量。

Spark Streaming 配置:

spark.batch.duration=10
spark.streaming.backpressure.enabled=true
spark.streaming.backpressure.initialRate=1000
spark.streaming.kafka.maxRatePerPartition=100
Run Code Online (Sandbox Code Playgroud)

Kafka Streams 配置(所有字节和时间相关)

# …
Run Code Online (Sandbox Code Playgroud)

spark-streaming kafka-consumer-api apache-kafka-streams spark-streaming-kafka

5
推荐指数
1
解决办法
1536
查看次数