标签: apache-kafka-streams

Kafka:Consumer API vs Streams API

我最近开始学习Kafka并最终得到这些问题.

  1. Consumer和Stream有什么区别?对我来说,如果任何工具/应用程序消费来自Kafka的消息是Kafka世界中的消费者.

  2. 流是如何不同的,因为这也消耗或产生消息给卡夫卡?为什么需要它,因为我们可以使用Consumer API编写我们自己的消费者应用程序并根据需要处理它们或将它们从消费者应用程序发送到Spark?

我对此做了谷歌,但没有得到任何好的答案.对不起,如果这个问题太琐碎了.

apache-kafka kafka-consumer-api apache-kafka-streams

65
推荐指数
2
解决办法
2万
查看次数

Akka Stream Kafka vs Kafka Streams

我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同.

我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能.

使用kafka流比akka溪流kafka有什么好处?

scala stream-processing typesafe akka-stream apache-kafka-streams

36
推荐指数
2
解决办法
1万
查看次数

如何发送时间窗口KTable的最终kafka-streams聚合结果?

我想做的是:

  1. 从数字主题(Long的)消费记录
  2. 聚合(计数)每个5秒窗口的值
  3. 将FINAL聚合结果发送到另一个主题

我的代码看起来像这样:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");
Run Code Online (Sandbox Code Playgroud)

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题.我的问题是如何只发送每个窗口的最终聚合结果?

apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
1万
查看次数

使用Kafka的Streams API处理错误消息

我有一个基本的流处理流程,看起来像

master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)

我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).

我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.

  • 这种方法被认为是最佳做法吗?
  • 有没有方便的Kafka溪流来处理这个问题?我不认为有DLQ的概念......
  • 什么是阻止卡夫卡干扰"坏消息"的替代方法?
  • 有哪些替代错误处理方法?

为了完整性,这里是我的代码(伪ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() { …
Run Code Online (Sandbox Code Playgroud)

error-handling apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
2万
查看次数

Kafka Streams API:KStream到KTable

我有一个Kafka主题,我发送位置事件(key = user_id,value = user_location).我能够阅读并处理它KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });
Run Code Online (Sandbox Code Playgroud)

这很有效,但我想拥有KTable每个用户的最后一个已知位置.我怎么能这样做?

我能够写一个中间主题并从中读取:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
Run Code Online (Sandbox Code Playgroud)

有没有一种简单的方法来获得KTable一个KStream?这是我第一个使用Kafka Streams的应用程序,所以我可能会遗漏一些明显的东西.

apache-kafka-streams

31
推荐指数
1
解决办法
1万
查看次数

Kafka Streaming Concurrency?

我有一些基本的Kafka Streaming代码,它从一个主题读取记录,进行一些处理,并将记录输出到另一个主题.

Kafka流如何处理并发?一切都在一个线程中运行吗?我没有在文档中看到这一点.

如果它是单线程的,我希望多线程处理的选项能够处理大量数据.

如果它是多线程的,我需要了解它是如何工作的以及如何处理资源,比如SQL数据库连接应该在不同的处理线程中共享.

相对于其他选项(Spark,Akka,Samza,Storm等),Kafka的内置流API是否不推荐用于高容量场景?

apache-kafka apache-kafka-streams

21
推荐指数
2
解决办法
1万
查看次数

Apache Samza和Apache Kafka Streams之间的区别(专注于并行和通信)

在Samza和Kafka Streams中,数据流处理以序列/图形(在Samza中称为"数据流图"和在Kafka Streams中称为"拓扑")的处理步骤(在Samza中称为"作业"和在Kafka Streams中称为"处理器")执行我将在这个问题的其余部分中将这两个术语称为工作流程工作者.

让我们假设我们有一个非常简单的工作流程,包括一个消耗传感器测量值的工人A,并过滤掉低于50的所有值,然后是工人B接收剩余的测量值并过滤80以上的所有值.

输入(Kakfa主题X) - >(工人A) - >(工人B) - >输出(Kafka主题Y)

如果我明白了

正确地说,Samza和Kafka Streams都使用主题分区概念来复制工作流/工作人员,从而并行处理以实现可伸缩性.

但:

  • Samza将每个工作者(即作业)分别复制到多个任务(输入流中的每个分区一个).也就是说,任务是工作流的工作者的副本.

  • Kafka Streams一次将整个工作流程(即拓扑)复制到多个任务(输入流中的每个分区一个).也就是说,任务是整个工作流程的复制品.

这让我想到了我的问题:

  1. 假设只有一个分区:它是否正确,是不可能在Kafka Streams中的两台不同的机器上部署worker(A)和(B),而在Samza中这是可能的?(换句话说:无论是否有多个分区,Kafka Streams都无法将单个任务(即拓扑副本)拆分为两台机器.)

  2. Kafka Streams拓扑中的两个后续处理器(在同一任务中)如何通信?(我知道在Samza中,两个后续工作人员(即工作)之间的所有通信都是用Kafka主题完成的,但是由于必须在代码中明确地在Kafka Streams中"标记"哪些流必须作为Kafka主题发布,否则这不可能在这里是这样的.)

  3. Samza是否自动发布所有中间流作为Kafka主题(并因此使其可供潜在客户使用)是正确的,而Kafka Streams仅发布明确标记的中间和最终流(addSink在低级API和/ tothroughDSL中) )?

(我知道Samza可以使用其他消息队列而不是Kafka,但这与我的问题无关.)

apache-kafka apache-kafka-streams

18
推荐指数
1
解决办法
6642
查看次数

为什么Apache Kafka Streams使用RocksDB以及如何更改它?

在Apache Kafka 0.9和0.10的新功能调查期间,我们使用了KStreams和KTables.有一个有趣的事实是,Kafka在内部使用RocksDB.请参阅Kafka Streams简介:流处理变得简单.RocksDB不是用JVN兼容语言编写的,因此需要仔细处理部署,因为它需要额外的共享库(取决于操作系统).

这里有一些简单的问题:

  • 为什么Apache Kafka Streams使用RocksDB?
  • 怎么可能改变它?

我试图搜索答案,但我只看到隐含的原因,RocksDB在每秒大约数百万次操作范围内的操作非常快.

另一方面,我看到一些用Java编码的数据库,也许端到端他们可以做到这一点,而且他们不会通过JNI.

java-native-interface in-memory-database key-value-store rocksdb apache-kafka-streams

17
推荐指数
1
解决办法
1万
查看次数

UnsatisfiedLinkError:/tmp/snappy-1.1.4-libsnappyjava.so加载共享库时出错ld-linux-x86-64.so.2:没有这样的文件或目录

我正在尝试在kubernetes中运行Kafka Streams应用程序.当我启动pod时,我得到以下异常:

Exception in thread "streams-pipe-e19c2d9a-d403-4944-8d26-0ef27ed5c057-StreamThread-1"
java.lang.UnsatisfiedLinkError: /tmp/snappy-1.1.4-5cec5405-2ce7-4046-a8bd-922ce96534a0-libsnappyjava.so: 
Error loading shared library ld-linux-x86-64.so.2: No such file or directory 
(needed by /tmp/snappy-1.1.4-5cec5405-2ce7-4046-a8bd-922ce96534a0-libsnappyjava.so)
        at java.lang.ClassLoader$NativeLibrary.load(Native Method)
        at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
        at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
        at java.lang.Runtime.load0(Runtime.java:809)
        at java.lang.System.load(System.java:1086)
        at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:179)
        at org.xerial.snappy.SnappyLoader.loadSnappyApi(SnappyLoader.java:154)
        at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47)
        at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
        at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:466)
        at java.io.DataInputStream.readByte(DataInputStream.java:265)
        at org.apache.kafka.common.utils.ByteUtils.readVarint(ByteUtils.java:168)
        at org.apache.kafka.common.record.DefaultRecord.readFrom(DefaultRecord.java:292)
        at org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(DefaultRecordBatch.java:264)
        at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:563)
        at org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next(DefaultRecordBatch.java:532)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1060)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1095)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Run Code Online (Sandbox Code Playgroud)

以前我尝试使用docker容器启动kafka和kafka-streams-app,它们工作得非常好.这是我第一次尝试使用Kubernetes.

这是我的DockerFile StreamsApp …

java apache-kafka snappy apache-kafka-streams

14
推荐指数
3
解决办法
4755
查看次数

如何在单个Kafka Streams应用程序中连接多个集群?

Kafka Streams开发人员指南中,它说:

Kafka Streams应用程序只能与此配置值指定的单个Kafka群集通信.未来版本的Kafka Streams将支持连接到不同的Kafka集群,以读取输入流和编写输出流.

这是否意味着我的整个应用程序只能连接到单个Kafka群集,或者每个KafkaStreams实例只能连接到一个群集?

我可以创建多个具有连接到不同群集的不同属性的KafkaStreams实例吗?

apache-kafka apache-kafka-streams

12
推荐指数
2
解决办法
2510
查看次数