我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了一些关于它的好东西,因此开发人员可以主要关注事物的业务逻辑方面。
在这里,我有我的简单应用程序类。
package com.some.events.consumer
import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class ConsumerApplication {
@Bean
fun consume(): Consumer<KStream<String, SomeEvent>> {
return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
}
}
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
Run Code Online (Sandbox Code Playgroud)
我的application.yml文件如下。
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: "some-event"
group: "some-event"
Run Code Online (Sandbox Code Playgroud)
我的依赖项build.gradle.kts定义如下(这里只包含相关的)。
extra["springCloudVersion"] = "2020.0.2"
dependencies { …Run Code Online (Sandbox Code Playgroud) spring apache-kafka spring-boot spring-cloud-stream apache-kafka-streams
我花了几个小时试图设置我的 Google Firebase 凭据,而不使用设置指南中推荐的 Json 文件路径,但没有成功。
我将服务帐户 Json 文件的内容存储在 AWS SSM 参数存储中,加密为SecureString. 在运行时,我的应用程序将获取 Json 字符串并对其进行解密。然后,我将其放入 Python 字典并将其传递给credentials.Certificate(service_acc_dict).
以上是行不通的。这是我的代码。
# `secret` is the Json string I get back after decrypting the google service account credentials from aws ssm
secret = '{\\n \"type\": \"service_account\",\\n \"project_id\": \"some-project\",\\n \"private_key_id\": \"11z11111z1z111111b1111111z11z11z1111111z\",\\n \"private_key\": \"-----BEGIN PRIVATE KEY-----\\\\nZZZZzZZZZZZZZzzzzzzZ1z1ZZZZZZZZZZZzzzzZzZzEAAoIBAQC7+0fFwMo6Pbz7\\\\nyk3Fxae5/Ebq56KSzezKk30+wKPymSW/uXBlIZZXlFJdKZNTFI5UdbPsKSypp+cW\\\\nNoAq06ojJ727j25ygMAOILeFJD1fb6c0TrDHsBiw0ECmPT9EOHddjHfF8Oj/gbg+\\\\n2EyRPZiT8238QfbZHnbZ35RpsnasNfk0n0qdB5///w1iFjzfZZbf+9UX6wE6ht7q\\\\nJlBOZan104saXi4UbmAmnz3fX/RVJ4ubO9XE4iDzQbljNONBNJvSbX9GuOgiTmCw\\\\nCK/x23rihABCZ6c9Q/3rkLsJEVqHYZkVHwaGcBF4V44qUrsd3GrHryCHLawhhz8l\\\\nDd3rBDIZAgMBAAECggEAFGgebgLUUUdDfUgEclxXNGAVTdMojGxLcOBa/9V01tC2\\\\nTt5oK6peQkqpOFDbm/DG1LdkXVZI8W/3P6uR9VQ+C4v0ZmiXMln0v3PgyFTbTsF1\\\\nstF6Emt0+rjY09MhS5wfpSmrFAQcd2oasMPVaAz6Q9Fw1qoojIBooZVKbMEBbgdM\\\\nUcs9tuCnYAOggNwgYoGsldAlkjrAOx1iopyHVhBo+cHbYW03Bgncvpq7fLLL056H\\\\aaaaAAAmVDoBFnvfS2SfT9DBPLeFC0JSgc6U6b1v9lzjzRWNdG6OfBFYJIwTZ8sCd\\\\n6wT7twniL0gBPN/y3TM5Skbo0c7IYo2LVyWcFy6j4wKBgQDb85FWMchXkRWLQDRJ\\\\npJm2JcFT3TATX7RMcGD4XRk/YXDD3pgwDi6zjqYcCFtYLHGTScc2wFAvkX4xUayX\\\\zz1ZZzzzZ939DQ/S2rPkJmUJpwY3hHd3mkAMV0CzzAAf1wvLeCO0g8N1AsX2YjhJb\\\\n81yLbV44EpERTOsbMkPpEXrsbwKBgQDaylzmnYCCwWKBQoTV7x07jZmIa2vCWCol\\\\nKO3zuhwk1r6HkdKtk1wN8kp8auMGf72REU4KRvsUQ6b+IzhP7kFcDYegNwv+JdB1\\\\n2FM0ZzpFmiIIHfgHGJhb2O1z58n5m01CrhpyD39y36MDTmC6zxmS1dHry0puV/fG\\\\nS8/wZTad9wKBgQCZw9U+5N6iGRNunhvvv9qVtB9Leb46TRXGummQN8WGwaALznmm\\\\nXsPXU0pdHpp9MdTUmydh52AnYRdPc0GtWzCrxvOZMGUPqVpSuun/A92iAWYmkrKm\\\\nlNcTUM2bs9HcxcTz5FmiRcDnkS20kN3U2nVAI91SZeh0p8lU4fcH4OiGkQKBgD7b\\\\nHE10ulLWVAJmpdsAUxmk2JMEqXSv94utco8uzJ8YwqwYDLqpNy0aiqOr4YUgdcmT\\\\neyQguEleFj+0xpzQCh70FB7HMb7WBkmU2HKZpXgRi+1hDrybKEpay/0cfj4ji9K4\\\\nSgiywx6xeReeENQaY3J301M2mC+TPi/N3/NkYIiJAoGAfBwdbptV/U7hXXgTnFps\\\\nZHdpAH4SddVBe2Ki6DLIiZPliXUSKcClrhy4evl2f4mA4sy7ovlBiXRA+IoNfkTT\\\\nH1Resx2kTlrkpT5+gsmDiY5HMNBHWuPjPIWNPwxzBSU3KK64TwPLFD0FBdfC6maS\\\\nVqyeIaHUW59ExHN5+FOfmWY=\\\\n-----END PRIVATE KEY-----\\\\n\",\\n \"client_email\": \"firebase-adminsdk-zz1zz@some-project.iam.gserviceaccount.com\",\\n \"client_id\": \"111111111111111111111\",\\n \"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\\n \"token_uri\": \"https://oauth2.googleapis.com/token\",\\n \"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\\n \"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/firebase-adminsdk-zz1zz%40some-project.iam.gserviceaccount.com\"\\n}\\n'
js = secret.replace('\\n ', '').replace('\\\\n', '').replace('\\n', '')
js_dict = json.loads(js) …Run Code Online (Sandbox Code Playgroud) python firebase google-cloud-platform firebase-authentication
我在测试 Kafka 消费者和生产者时遇到了问题。集成测试间歇性失败,并显示TopicExistsException.
这就是我当前的测试类 -UserEventListenerTest对于其中一位消费者来说是这样的:
@SpringBootTest(properties = ["application.kafka.user-event-topic=user-event-topic-UserEventListenerTest",
"application.kafka.bootstrap=localhost:2345"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class UserEventListenerTest {
private val logger: Logger = LoggerFactory.getLogger(javaClass)
@Value("\${application.kafka.user-event-topic}")
private lateinit var userEventTopic: String
@Autowired
private lateinit var kafkaConfigProperties: KafkaConfigProperties
private lateinit var embeddedKafka: EmbeddedKafkaRule
private lateinit var sender: KafkaSender<String, UserEvent>
private lateinit var receiver: KafkaReceiver<String, UserEvent>
@BeforeAll
fun setup() {
embeddedKafka = EmbeddedKafkaRule(1, false, userEventTopic)
embeddedKafka.kafkaPorts(kafkaConfigProperties.bootstrap.substringAfterLast(":").toInt())
embeddedKafka.before()
val producerProps: HashMap<String, Any> = hashMapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaConfigProperties.bootstrap,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to "com.project.userservice.config.MockAvroSerializer"
)
val …Run Code Online (Sandbox Code Playgroud) kotlin apache-kafka spring-boot spring-kafka spring-kafka-test