测试类: -
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { WebsocketSourceConfiguration.class,
WebSocketSourceIntegrationTests.class }, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = {
"websocket.path=/some_websocket_path", "websocket.allowedOrigins=*",
"spring.cloud.stream.default-binder=kafka" })
public class WebSocketSourceIntegrationTests {
private String port = "8080";
@Test
public void testWebSocketStreamSource() throws IOException, InterruptedException {
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
ClientWebSocketContainer clientWebSocketContainer = new ClientWebSocketContainer(webSocketClient,
"ws://localhost:" + port + "/some_websocket_path");
clientWebSocketContainer.start();
WebSocketSession session = clientWebSocketContainer.getSession(null);
session.sendMessage(new TextMessage("foo"));
System.out.println("Done****************************************************");
}
}
Run Code Online (Sandbox Code Playgroud)
我在这里看到了同样的问题,但没有任何帮助.我可以知道我错过了什么吗?
我spring-boot-starter-tomcat在依赖关系层次结构中具有编译时依赖性.
使用Kafka作为微服务架构中的消息传递系统,使用spring-kafka与spring-cloud-stream + spring-cloud-starter-stream-kafka有什么好处?
Spring云流框架支持更多的消息传递系统,因此具有更多的模块化设计.但功能呢?spring-kafka和spring-cloud-stream + spring-cloud-starter-stream-kafka的功能之间是否存在差距?哪个API设计得更好?
期待阅读您的意见
我需要测试包含标题的消息,所以我需要使用 MessageBuilder,但我无法序列化。
我尝试在生产者道具上添加序列化设置,但没有奏效。
有人能帮我吗?
这个错误:
org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Run Code Online (Sandbox Code Playgroud)
我的测试班:
public class TransactionMastercardAdapterTest extends AbstractTest{
@Autowired
private KafkaTemplate<String, Message<String>> template;
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
@BeforeClass
public static void setUp() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
@Test
public void sendTransactionCommandTest(){
String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
+ "\"cardId\" : \"11\","
+ "\"transactionId\" : \"20110405123456\","
+ "\"amount\" : 200.59,"
+ "\"partnerId\" : \"11\"}";
Map<String, Object> …Run Code Online (Sandbox Code Playgroud) apache-kafka kafka-producer-api spring-cloud-stream spring-kafka
我们正在将kafka代理设置与使用Spring云流kafka运行的kafka stream应用程序结合使用。尽管看起来运行良好,但在日志中确实出现了以下错误语句:
2019-02-21 22:37:20,253 INFO kafka-coordinator-heartbeat-thread | anomaly-timeline org.apache.kafka.clients.FetchSessionHandler [Consumer clientId=anomaly-timeline-56dc4481-3086-4359-a8e8-d2dae12272a2-StreamThread-1-consumer, groupId=anomaly-timeline] Node 2 was unable to process the fetch request with (sessionId=1290440723, epoch=2089): INVALID_FETCH_SESSION_EPOCH.
Run Code Online (Sandbox Code Playgroud)
我搜索了互联网,但是关于此错误的信息不多。我猜想这可能与经纪人和使用者之间的时间设置有所不同,但是两台机器的时间服务器设置相同。
知道如何解决吗?
我正在为 AWS Kinesis 部署带有 spring-cloud-stream 和 binder 的 springboot 应用程序。如果 beanstalk 配置了公共 ip,则该应用程序在部署在 aws elastic beanstalk 上时工作正常。当我们将 beanstalk 设置为私有 ip 时,应用程序会在部署时抛出警告,因为无法从 aws 获取一些元数据。
为什么要尝试获取这些资源?有一些解决方法可以删除此警告吗?
这些是在部署应用程序并且没有在 ec2 上设置公共 ip 时抛出的警告。
com.amazonaws.util.EC2MetadataUtils : Unable to retrieve the requested metadata (/latest/meta-data/public-ipv4). The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-ipv4
com.amazonaws.SdkClientException: The requested metadata is not found at http://169.254.169.254/latest/meta-data/public-ipv4
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:122)
at com.amazonaws.internal.EC2CredentialsUtils.readResource(EC2CredentialsUtils.java:82)
at com.amazonaws.util.EC2MetadataUtils.getItems(EC2MetadataUtils.java:400)
at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:369)
at com.amazonaws.util.EC2MetadataUtils.getData(EC2MetadataUtils.java:365)
at org.springframework.cloud.aws.core.env.ec2.AmazonEc2InstanceDataPropertySource.getProperty(AmazonEc2InstanceDataPropertySource.java:89)
at org.springframework.cloud.bootstrap.encrypt.EnvironmentDecryptApplicationInitializer.collectEncryptedProperties(EnvironmentDecryptApplicationInitializer.java:199)
at org.springframework.cloud.bootstrap.encrypt.EnvironmentDecryptApplicationInitializer.decrypt(EnvironmentDecryptApplicationInitializer.java:166)
at org.springframework.cloud.bootstrap.encrypt.EnvironmentDecryptApplicationInitializer.initialize(EnvironmentDecryptApplicationInitializer.java:96)
at org.springframework.boot.SpringApplication.applyInitializers(SpringApplication.java:649)
at org.springframework.boot.SpringApplication.prepareContext(SpringApplication.java:373)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)
at …Run Code Online (Sandbox Code Playgroud) spring-boot amazon-elastic-beanstalk spring-cloud-stream spring-cloud-aws
使用 spring-cloud-stream 的 kafka binder,如何配置并发消息消费者(在单个消费者 jvm 中)?如果我理解正确,在使用 kafka 时并发消息消耗需要分区,但scs 文档表明要使用分区,您需要通过 partitionKeyExpression 或 partitionKeyExtractorClass 在生产者中指定分区选择。Kafka 文档提到了循环分区。
scs 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency ,尽管这在我上面描述的用例中似乎很重要。使用生产者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/json
partitionCount: 3
Run Code Online (Sandbox Code Playgroud)
和消费者配置
spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/x-java-object;type=foo.Customer
partitioned: true
concurrency: 3
Run Code Online (Sandbox Code Playgroud)
我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于活动状态,尽管似乎确实存在一些除循环之外的分区在起作用,因为有些消息似乎在等待繁忙的消费者线程并在该线程完成后被消耗。我认为这是因为消息被发送到同一个分区。
当我没有指定 partitionKeyExpression 或 partitionKeyExtractorClass 时,是否有一些默认的密钥提取和分区策略在生产者上使用?这是使用 kafka 设置 scs 使用者的合适方法,您希望多个线程使用消息以增加使用者吞吐量?
我有两个带有Kafka-stream依赖项的Spring启动项目,它们在gradle和完全相同的配置中具有完全相同的依赖关系,但是启动时项目之一的日志错误如下所示
11:35:37.974 [restartedMain] INFO o.a.k.c.admin.AdminClientConfig - AdminClientConfig values:
bootstrap.servers = [192.169.0.109:6667]
client.id = client
connections.max.idle.ms = 300000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 120000
retries = 5
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null …Run Code Online (Sandbox Code Playgroud) 我想自定义 Spring Cloud Stream Producers、Consumers 和 KStreams 中 Avro 模式主题的命名策略。
key.subject.name.strategy这将在 Kafka 中通过以下属性完成value.subject.name.strategy-> https://docs.confluence.io/current/schema-registry/serializer-formatter.html#subject-name-strategy
在原生 Kafka Producer 中,这是有效的:
private val producer: KafkaProducer<Int, Customer>
init {
val props = Properties()
...
props[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
props[AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java.name
producer = KafkaProducer(props)
}
fun sendCustomerEvent(customer: Customer) {
val record: ProducerRecord<Int, Customer> = ProducerRecord("customer", customer.id, customer)
producer.send(record)
}
Run Code Online (Sandbox Code Playgroud)
但是我找不到如何在 Spring Cloud Stream 中执行此操作。到目前为止,我已经在制作人中尝试过了:
spring:
application:
name: spring-boot-customer-service
cloud:
stream:
kafka:
bindings:
output:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
subject:
name:
strategy: …Run Code Online (Sandbox Code Playgroud) avro apache-kafka spring-cloud-stream apache-kafka-streams confluent-schema-registry
我们将 Spring Boot 版本从 2.2.2 升级到 2.3.0,2.2.2kafka_consumer_*中 Prometheus 端点中看到的所有指标在 2.3.0 中都看不到。
例如,缺少以下所有内容:
kafka_consumer_records_consumed_total_records_totalkafka_consumer_records_lag_recordskafka_consumer_fetch_latency_max_secondskafka_consumer_bytes_consumed_total_bytes_total不确定我们是否缺少某种配置或文档中隐藏的东西......
下面是我们build.gradle.kts改动前的:
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "2.2.2.RELEASE"
id("io.spring.dependency-management") version "1.0.9.RELEASE"
kotlin("jvm") version "1.3.72"
kotlin("plugin.spring") version "1.3.72"
}
group = "ourGroup"
version …Run Code Online (Sandbox Code Playgroud) spring-boot spring-boot-actuator prometheus spring-cloud-stream spring-kafka
请参阅下面的更新以显示潜在的解决方法
我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增加分区数量时,我们注意到生成到输出主题的消息数量减少了。
在启动应用程序之前,我们使用多个分区配置测试了这一理论。使用 1 个分区,我们可以看到 100% 的消息。使用 2,我们会看到一些消息(少于 50%)。对于 10,我们几乎看不到任何东西(少于 10%)。
因为我们离开了,所以从主题 1 消费的每条消息都应该写入我们的输出主题,但我们发现这并没有发生。消息似乎卡在从 Ktables 的外键连接创建的“中间”主题中,但没有错误消息。
任何帮助将不胜感激!
服务.java
@Bean
public BiFunction<KTable<MyKey, MyValue>, KTable<MyOtherKey, MyOtherValue>, KStream<MyKey, MyEnrichedValue>> process() {
return (topicOne, topicTwo) ->
topicOne
.leftJoin(topicTwo,
value -> MyOtherKey.newBuilder()
.setFieldA(value.getFieldA())
.setFieldB(value.getFieldB())
.build(),
this::enrich)
.toStream();
}
Run Code Online (Sandbox Code Playgroud)
构建.gradle
plugins {
id 'org.springframework.boot' version '2.3.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'com.commercehub.gradle.plugin.avro' version '0.9.1'
}
...
ext {
set('springCloudVersion', "Hoxton.SR6")
}
...
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
implementation 'io.confluent:kafka-streams-avro-serde:5.5.1'
Run Code Online (Sandbox Code Playgroud)
注意:由于 spring-cloud-stream 中包含的版本中存在错误,我们排除了 org.apache.kafka 依赖项 …
java apache-kafka spring-boot spring-cloud-stream apache-kafka-streams
spring-boot ×5
apache-kafka ×4
spring-kafka ×4
java ×2
spring ×2
avro ×1
gradle ×1
kotlin ×1
prometheus ×1