标签: spring-cloud-stream

Spring启动测试失败说,由于缺少ServletWebServerFactory bean,无法启动ServletWebServerApplicationContext

测试类: -

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { WebsocketSourceConfiguration.class,
        WebSocketSourceIntegrationTests.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
                "websocket.path=/some_websocket_path", "websocket.allowedOrigins=*",
                "spring.cloud.stream.default-binder=kafka" })
public class WebSocketSourceIntegrationTests {

    private String port = "8080";

    @Test
    public void testWebSocketStreamSource() throws IOException, InterruptedException {
        StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
        ClientWebSocketContainer clientWebSocketContainer = new ClientWebSocketContainer(webSocketClient,
                "ws://localhost:" + port + "/some_websocket_path");
        clientWebSocketContainer.start();
        WebSocketSession session = clientWebSocketContainer.getSession(null);
        session.sendMessage(new TextMessage("foo"));
        System.out.println("Done****************************************************");
    }

}
Run Code Online (Sandbox Code Playgroud)

在这里看到了同样的问题,但没有任何帮助.我可以知道我错过了什么吗?

spring-boot-starter-tomcat在依赖关系层次结构中具有编译时依赖性.

java spring spring-boot spring-cloud-stream

19
推荐指数
1
解决办法
4万
查看次数

Spring-Kafka vs. Spring-Cloud-Stream(Kafka)

使用Kafka作为微服务架构中的消息传递系统,使用spring-kafka与spring-cloud-stream + spring-cloud-starter-stream-kafka有什么好处?

Spring云流框架支持更多的消息传递系统,因此具有更多的模块化设计.但功能呢?spring-kafka和spring-cloud-stream + spring-cloud-starter-stream-kafka的功能之间是否存在差距?哪个API设计得更好?

期待阅读您的意见

spring spring-integration spring-cloud-stream spring-kafka

16
推荐指数
2
解决办法
4403
查看次数

发送到 kafka 主题时序列化消息时出错

我需要测试包含标题的消息,所以我需要使用 MessageBuilder,但我无法序列化。

我尝试在生产者道具上添加序列化设置,但没有奏效。

有人能帮我吗?

这个错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Run Code Online (Sandbox Code Playgroud)

我的测试班:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api spring-cloud-stream spring-kafka

10
推荐指数
1
解决办法
3万
查看次数

卡夫卡INVALID_FETCH_SESSION_EPOCH

我们正在将kafka代理设置与使用Spring云流kafka运行的kafka stream应用程序结合使用。尽管看起来运行良好,但在日志中确实出现了以下错误语句:

2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH. 
Run Code Online (Sandbox Code Playgroud)

我搜索了互联网,但是关于此错误的信息不多。我猜想这可能与经纪人和使用者之间的时间设置有所不同,但是两台机器的时间服务器设置相同。

知道如何解决吗?

apache-kafka spring-cloud-stream apache-kafka-streams

10
推荐指数
2
解决办法
1万
查看次数

无法检索请求的元数据 /latest/meta-data/public-hostname

我正在为 AWS Kinesis 部署带有 spring-cloud-stream 和 binder 的 springboot 应用程序。如果 beanstalk 配置了公共 ip,则该应用程序在部署在 aws elastic beanstalk 上时工作正常。当我们将 beanstalk 设置为私有 ip 时,应用程序会在部署时抛出警告,因为无法从 aws 获取一些元数据。

为什么要尝试获取这些资源?有一些解决方法可以删除此警告吗?

这些是在部署应用程序并且没有在 ec2 上设置公共 ip 时抛出的警告。

 com.amazonaws.util.EC2MetadataUtils      : Unable to retrieve the requested metadata (/latest/meta-data/public-ipv4). The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-ipv4

com.amazonaws.SdkClientException: The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-ipv4
        at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:122)
        at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:82)
        at com.amazonaws.util.EC2MetadataUtils.getItems(EC2MetadataUtils.java:400)
        at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:369)
        at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:365)
        at org.springframework.cloud.aws.core.env.ec2.AmazonEc2InstanceDataPropertySource.getProperty(AmazonEc2InstanceDataPropertySource.java:89)
        at org.springframework.cloud.bootstrap.encrypt.EnvironmentDecryptApplicationInitializer.collectEncryptedProperties(EnvironmentDecryptApplicationInitializer.java:199)
        at org.springframework.cloud.bootstrap.encrypt.EnvironmentDecryptApplicationInitializer.decrypt(EnvironmentDecryptApplicationInitializer.java:166)
        at org.springframework.cloud.bootstrap.encrypt.EnvironmentDecryptApplicationInitializer.initialize(EnvironmentDecryptApplicationInitializer.java:96)
        at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:649)
        at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:373)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)
        at …
Run Code Online (Sandbox Code Playgroud)

spring-boot amazon-elastic-beanstalk spring-cloud-stream spring-cloud-aws

9
推荐指数
1
解决办法
7825
查看次数

spring-cloud-stream kafka 消费者并发

使用 spring-cloud-stream 的 kafka binder,如何配置并发消息消费者(在单个消费者 jvm 中)?如果我理解正确,在使用 kafka 时并发消息消耗需要分区,但scs 文档表明要使用分区,您需要通过 partitionKeyExpression 或 partitionKeyExtractorClass 在生产者中指定分区选择。Kafka 文档提到了循环分区。

scs 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency ,尽管这在我上面描述的用例中似乎很重要。使用生产者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save:
          destination: customer-save
          group: customer-save
          content-type: application/json
          partitionCount: 3
Run Code Online (Sandbox Code Playgroud)

和消费者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save: 
          destination: customer-save
          group: customer-save
          content-type: application/x-java-object;type=foo.Customer
          partitioned: true
          concurrency: 3
Run Code Online (Sandbox Code Playgroud)

我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于活动状态,尽管似乎确实存在一些除循环之外的分区在起作用,因为有些消息似乎在等待繁忙的消费者线程并在该线程完成后被消耗。我认为这是因为消息被发送到同一个分区。

当我没有指定 partitionKeyExpression 或 partitionKeyExtractorClass 时,是否有一些默认的密钥提取和分区策略在生产者上使用?这是使用 kafka 设置 scs 使用者的合适方法,您希望多个线程使用消息以增加使用者吞吐量?

spring-cloud-stream

7
推荐指数
1
解决办法
8199
查看次数

Spring Cloud Kafka Stream无法创建Producer配置错误

我有两个带有Kafka-stream依赖项的Spring启动项目,它们在gradle和完全相同的配置中具有完全相同的依赖关系,但是启动时项目之一的日志错误如下所示

11:35:37.974 [restartedMain] INFO  o.a.k.c.admin.AdminClientConfig - AdminClientConfig values: 
    bootstrap.servers = [192.169.0.109:6667]
    client.id = client
    connections.max.idle.ms = 300000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 120000
    retries = 5
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null …
Run Code Online (Sandbox Code Playgroud)

gradle kotlin spring-boot spring-cloud-stream spring-kafka

7
推荐指数
1
解决办法
3237
查看次数

我们如何为Spring Cloud Stream Kafka生产者、消费者和KStreams中的模式配置value.subject.name.strategy?

我想自定义 Spring Cloud Stream Producers、Consumers 和 KStreams 中 Avro 模式主题的命名策略。

key.subject.name.strategy这将在 Kafka 中通过以下属性完成value.subject.name.strategy-> https://docs.confluence.io/current/schema-registry/serializer-formatter.html#subject-name-strategy

在原生 Kafka Producer 中,这是有效的:


private val producer: KafkaProducer<Int, Customer>

    init {
        val props = Properties()
        ...
        props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
        props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
        producer = KafkaProducer(props)
    }

    fun sendCustomerEvent(customer: Customer) {
        val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
        producer.send(record)
    }
Run Code Online (Sandbox Code Playgroud)

但是我找不到如何在 Spring Cloud Stream 中执行此操作。到目前为止,我已经在制作人中尝试过了:

spring:
  application:
    name: spring-boot-customer-service
  cloud:
    stream:
      kafka:
        bindings:
          output:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.IntegerSerializer
                value:
                  subject:
                    name:
                      strategy: …
Run Code Online (Sandbox Code Playgroud)

avro apache-kafka spring-cloud-stream apache-kafka-streams confluent-schema-registry

7
推荐指数
1
解决办法
1万
查看次数

Kafka 消费者指标从 Spring Boot 2.2.2 升级到 2.3.0

问题:

我们将 Spring Boot 版本从 2.2.2 升级到 2.3.0,2.2.2kafka_consumer_*中 Prometheus 端点中看到的所有指标在 2.3.0 中都看不到。

例如,缺少以下所有内容:

  • kafka_consumer_records_consumed_total_records_total
  • kafka_consumer_records_lag_records
  • kafka_consumer_fetch_latency_max_seconds
  • kafka_consumer_bytes_consumed_total_bytes_total

不确定我们是否缺少某种配置或文档中隐藏的东西......

已经尝试过的:

  • 梳理了 Spring Boot 2.3.0 发行说明、更新的千分尺文档和更新的 spring-kafka 文档,了解为什么会发生这种情况
  • 谷歌搜索到感觉就像地球的尽头
  • 尝试升级到 Spring Boot 2.2.7 并且 kafka 指标仍然存在,只有升级到 2.3.0 似乎会导致问题
  • 删除了我们项目代码中的任何不需要的依赖项/自定义,并且裸机只是连接到本地主机上的 kafka 容器,并且指标仍然没有出现

相关代码/详情:

  • 我们将 Red Hat AMQ Streams 用于我们的 kafka 代理(kafka 版本 2.1.1)
  • 我们在环境中唯一改变的是 Spring Boot 版本(以及自动拉入/更新的依赖项)以重新创建此问题

下面是我们build.gradle.kts改动前的:

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "2.2.2.RELEASE"
    id("io.spring.dependency-management") version "1.0.9.RELEASE"
    kotlin("jvm") version "1.3.72"
    kotlin("plugin.spring") version "1.3.72"
}

group = "ourGroup"
version …
Run Code Online (Sandbox Code Playgroud)

spring-boot spring-boot-actuator prometheus spring-cloud-stream spring-kafka

7
推荐指数
1
解决办法
2360
查看次数

当主题有多个分区时,KTable-KTable 外键连接不会产生所有消息

请参阅下面的更新以显示潜在的解决方法

我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。

在启动应用程序之前,我们使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们会看到一些消息(少于 50%)。对于 10,我们几乎看不到任何东西(少于 10%)。

因为我们离开了,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。

任何帮助将不胜感激!

服务.java

@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {

    return (topicOne, topicTwo) ->
            topicOne
                    .leftJoin(topicTwo,
                            value -> MyOtherKey.newBuilder()
                                    .setFieldA(value.getFieldA())
                                    .setFieldB(value.getFieldB())
                                    .build(),
                            this::enrich)
                    .toStream();
}
Run Code Online (Sandbox Code Playgroud)

构建.gradle

plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}

...

ext {
    set('springCloudVersion', "Hoxton.SR6")
}

...

implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
Run Code Online (Sandbox Code Playgroud)

注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项 …

java apache-kafka spring-boot spring-cloud-stream apache-kafka-streams

7
推荐指数
1
解决办法
970
查看次数