小编bil*_*ydh的帖子

Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空

我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了一些关于它的好东西,因此开发人员可以主要关注事物的业务逻辑方面。

在这里,我有我的简单应用程序类。

package com.some.events.consumer

import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer

@SpringBootApplication
class ConsumerApplication {
    @Bean
    fun consume(): Consumer<KStream<String, SomeEvent>> {
        return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
    }
}

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}

Run Code Online (Sandbox Code Playgroud)

我的application.yml文件如下。

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: "some-event"
          group: "some-event"
Run Code Online (Sandbox Code Playgroud)

我的依赖项build.gradle.kts定义如下(这里只包含相关的)。

extra["springCloudVersion"] = "2020.0.2"

dependencies { …
Run Code Online (Sandbox Code Playgroud)

spring apache-kafka spring-boot spring-cloud-stream apache-kafka-streams

3
推荐指数
1
解决办法
1209
查看次数

如何不使用 Json 文件而是使用 python dict 设置 Google Firebase 凭据?

我花了几个小时试图设置我的 Google Firebase 凭据,而不使用设置指南中推荐的 Json 文件路径,但没有成功。

我想做什么

我将服务帐户 Json 文件的内容存储在 AWS SSM 参数存储中,加密为SecureString. 在运行时,我的应用程序将获取 Json 字符串并对其进行解密。然后,我将其放入 Python 字典并将其传递给credentials.Certificate(service_acc_dict).

问题

以上是行不通的。这是我的代码。

# `secret` is the Json string I get back after decrypting the google service account credentials from aws ssm

secret = '{\\n  \"type\": \"service_account\",\\n  \"project_id\": \"some-project\",\\n  \"private_key_id\": \"11z11111z1z111111b1111111z11z11z1111111z\",\\n  \"private_key\": \"-----BEGIN PRIVATE KEY-----\\\\nZZZZzZZZZZZZZzzzzzzZ1z1ZZZZZZZZZZZzzzzZzZzEAAoIBAQC7+0fFwMo6Pbz7\\\\nyk3Fxae5/Ebq56KSzezKk30+wKPymSW/uXBlIZZXlFJdKZNTFI5UdbPsKSypp+cW\\\\nNoAq06ojJ727j25ygMAOILeFJD1fb6c0TrDHsBiw0ECmPT9EOHddjHfF8Oj/gbg+\\\\n2EyRPZiT8238QfbZHnbZ35RpsnasNfk0n0qdB5///w1iFjzfZZbf+9UX6wE6ht7q\\\\nJlBOZan104saXi4UbmAmnz3fX/RVJ4ubO9XE4iDzQbljNONBNJvSbX9GuOgiTmCw\\\\nCK/x23rihABCZ6c9Q/3rkLsJEVqHYZkVHwaGcBF4V44qUrsd3GrHryCHLawhhz8l\\\\nDd3rBDIZAgMBAAECggEAFGgebgLUUUdDfUgEclxXNGAVTdMojGxLcOBa/9V01tC2\\\\nTt5oK6peQkqpOFDbm/DG1LdkXVZI8W/3P6uR9VQ+C4v0ZmiXMln0v3PgyFTbTsF1\\\\nstF6Emt0+rjY09MhS5wfpSmrFAQcd2oasMPVaAz6Q9Fw1qoojIBooZVKbMEBbgdM\\\\nUcs9tuCnYAOggNwgYoGsldAlkjrAOx1iopyHVhBo+cHbYW03Bgncvpq7fLLL056H\\\\aaaaAAAmVDoBFnvfS2SfT9DBPLeFC0JSgc6U6b1v9lzjzRWNdG6OfBFYJIwTZ8sCd\\\\n6wT7twniL0gBPN/y3TM5Skbo0c7IYo2LVyWcFy6j4wKBgQDb85FWMchXkRWLQDRJ\\\\npJm2JcFT3TATX7RMcGD4XRk/YXDD3pgwDi6zjqYcCFtYLHGTScc2wFAvkX4xUayX\\\\zz1ZZzzzZ939DQ/S2rPkJmUJpwY3hHd3mkAMV0CzzAAf1wvLeCO0g8N1AsX2YjhJb\\\\n81yLbV44EpERTOsbMkPpEXrsbwKBgQDaylzmnYCCwWKBQoTV7x07jZmIa2vCWCol\\\\nKO3zuhwk1r6HkdKtk1wN8kp8auMGf72REU4KRvsUQ6b+IzhP7kFcDYegNwv+JdB1\\\\n2FM0ZzpFmiIIHfgHGJhb2O1z58n5m01CrhpyD39y36MDTmC6zxmS1dHry0puV/fG\\\\nS8/wZTad9wKBgQCZw9U+5N6iGRNunhvvv9qVtB9Leb46TRXGummQN8WGwaALznmm\\\\nXsPXU0pdHpp9MdTUmydh52AnYRdPc0GtWzCrxvOZMGUPqVpSuun/A92iAWYmkrKm\\\\nlNcTUM2bs9HcxcTz5FmiRcDnkS20kN3U2nVAI91SZeh0p8lU4fcH4OiGkQKBgD7b\\\\nHE10ulLWVAJmpdsAUxmk2JMEqXSv94utco8uzJ8YwqwYDLqpNy0aiqOr4YUgdcmT\\\\neyQguEleFj+0xpzQCh70FB7HMb7WBkmU2HKZpXgRi+1hDrybKEpay/0cfj4ji9K4\\\\nSgiywx6xeReeENQaY3J301M2mC+TPi/N3/NkYIiJAoGAfBwdbptV/U7hXXgTnFps\\\\nZHdpAH4SddVBe2Ki6DLIiZPliXUSKcClrhy4evl2f4mA4sy7ovlBiXRA+IoNfkTT\\\\nH1Resx2kTlrkpT5+gsmDiY5HMNBHWuPjPIWNPwxzBSU3KK64TwPLFD0FBdfC6maS\\\\nVqyeIaHUW59ExHN5+FOfmWY=\\\\n-----END PRIVATE KEY-----\\\\n\",\\n  \"client_email\": \"firebase-adminsdk-zz1zz@some-project.iam.gserviceaccount.com\",\\n  \"client_id\": \"111111111111111111111\",\\n  \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\\n  \"token_uri\": \"https://oauth2.googleapis.com/token\",\\n  \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\\n  \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-zz1zz%40some-project.iam.gserviceaccount.com\"\\n}\\n'

js = secret.replace('\\n  ', '').replace('\\\\n', '').replace('\\n', '')
js_dict = json.loads(js) …
Run Code Online (Sandbox Code Playgroud)

python firebase google-cloud-platform firebase-authentication

1
推荐指数
1
解决办法
1042
查看次数

如何使用EmbeddedKafkaRule/EmbeddedKafka设置Spring Kafka测试来修复TopicExistsException间歇性错误?

我在测试 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,并显示TopicExistsException.

这就是我当前的测试类 -UserEventListenerTest对于其中一位消费者来说是这样的:

@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
    "application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
    private val logger: Logger = LoggerFactory.getLogger(javaClass)

    @Value("\${application.kafka.user-event-topic}")
    private lateinit var userEventTopic: String

    @Autowired
    private lateinit var kafkaConfigProperties: KafkaConfigProperties

    private lateinit var embeddedKafka: EmbeddedKafkaRule
    private lateinit var sender: KafkaSender<String, UserEvent>
    private lateinit var receiver: KafkaReceiver<String, UserEvent>

    @BeforeAll
    fun setup() {
        embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
        embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
        embeddedKafka.before()

        val producerProps: HashMap<String, Any> = hashMapOf(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
        )
        val …
Run Code Online (Sandbox Code Playgroud)

kotlin apache-kafka spring-boot spring-kafka spring-kafka-test

1
推荐指数
1
解决办法
9909
查看次数