Kafka Streams - 缺少源主题

Din*_*dan 3 apache-kafka apache-kafka-streams

我在 Kafka Streams 拓扑中工作,有时在更改 applicationId 和/或 clientId 属性后,我在特定的 kafka 流上收到错误:“ Missing source topic stream.webshop.products.prices.5 durign assignment. Returning error INCOMPLETE_SOURCE_TOPIC_METADATA”。我已经create.topic=true在每个 Kafka 节点的 server.properties 中设置了属性,但似乎没有创建此流的主题。

这是我的 Kafka Streams 拓扑:

    package ro.orange.eshop.productindexer.domain

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Printed
import ro.orange.digital.avro.Aggregate
import ro.orange.digital.avro.Key
import ro.orange.digital.avro.Price
import ro.orange.digital.avro.StockQuantity
import ro.orange.eshop.productindexer.infrastructure.configuration.kafka.makeStoreProvider
import java.util.concurrent.CompletableFuture

class SaleProductTopology(
        private val streamNameRepository: IStreamNameRepository,
        private val saleProductMapper: ISaleProductMapper,
        private val productRatingMapper: IProductRatingMapper,
        private val productStockMapper: IProductStockMapper,
        private val lazyKafkaStreams: CompletableFuture<KafkaStreams>
) {
    fun streamsBuilder(): StreamsBuilder {
        val streamsBuilder = StreamsBuilder()
        val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
        val productPricesStream = streamsBuilder.stream<Key, Price>(streamNameRepository.productsPricesStreamTopic)
        val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
        val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)

        val productsStockStream = inputProductsStockStream
                .mapValues(productStockMapper::aStockQuantity)
        productsStockStream.to(streamNameRepository.productsStockStreamTopic)

        streamsBuilder.globalTable<Key, StockQuantity>(
                streamNameRepository.productsStockStreamTopic,
                Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic)
        )

        val quantityProvider = lazyKafkaStreams.makeStoreProvider<StockQuantity>(streamNameRepository.productsStockGlobalStoreTopic)

        val saleProductsTable = productsStream
                .groupByKey()
                .reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))
                .mapValues { aggregate -> saleProductMapper.aSaleProduct(aggregate, quantityProvider) }

        saleProductsTable.toStream().print(Printed.toSysOut())

        val productPricesTable = productPricesStream
                .groupByKey()
                .reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))

        productPricesTable.toStream().print(Printed.toSysOut())

        val productsRatingsTable = productsRatingsStream
                .groupByKey()
                .reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))
                .mapValues { aggregate -> productRatingMapper.aProductRating(aggregate) }

        productsRatingsTable.toStream().print(Printed.toSysOut())

        val productsStockTable = productsStockStream
                .groupByKey()
                .reduce { _, aggregate -> aggregate }

        saleProductsTable
                .leftJoin(productPricesTable) { saleProduct, price -> saleProductMapper.aPricedSaleProduct(saleProduct, price) }
                .leftJoin(productsRatingsTable) { saleProduct, rating -> saleProductMapper.aRatedSaleProduct(saleProduct, rating) }
                .leftJoin(productsStockTable) { saleProduct, stockQuantity -> saleProductMapper.aQuantifiedSaleProduct(saleProduct, stockQuantity) }
                .mapValues { saleProduct -> AggregateMapper.aSaleProductAggregate(saleProduct) }
                .toStream()
                .to(streamNameRepository.saleProductsTopic)

        return streamsBuilder
    }
}
Run Code Online (Sandbox Code Playgroud)

war*_*iak 5

正如@jacek-laskowski 所写:

KafkaStreams 不会创建它,因为它是一个源

这是设计使然,因为如果源主题之一是自动创建的(它将具有默认的分区数),而第二个是由用户预先创建的,则分区数可能会有所不同。当 KStream/KTable 加入时,它们必须具有相同数量的分区 - 这是至关重要的假设。

用户必须有意识地创建具有适当数量的分区的主题(对于控制 Kafka Streams 应用程序性能的方法之一的流处理线程的数量)。

阅读管理流应用程序主题