我可以使用 Spring Cloud Steam 实现手动 Kafka 偏移管理,如下所示:
我正在使用 Spring 云流,我想保存消息并在 Kafka 服务器消失时重试在该主题上发布它们,但即使 Kafka/Zookeeper 服务器停止,MessageChannel send() 方法也始终返回 true。
有人可以帮忙吗?
更新 application.yml 内容:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost
zk-nodes: localhost
mode: raw
bindings:
output:
producer:
sync: true
bindings:
output:
destination: topic-notification
content-type: application/json
Run Code Online (Sandbox Code Playgroud)
代码 :
@Service
public class SendToKafka {
private Logger log = LoggerFactory.getLogger(SendToKafka.class);
@Autowired
Source source;
@Autowired
NotificationFileService notificationFileService;
public void send(NotificationToResendDTO notification){
try {
CompletableFuture.supplyAsync(() -> notification)
.thenAcceptAsync(notif -> {
boolean resp = source.output().send(MessageBuilder.withPayload(notif).build());
log.info(" ======== kafka server response === " + resp);
if …Run Code Online (Sandbox Code Playgroud) 将我们的一项服务升级到 2.0.0.RC3 时,我们在尝试使用由使用旧版本Ditmars.RELEASEspring-cloud-stream的服务生成的消息时遇到了异常:spring-cloud-stream
错误31241 --- [container-4-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.converter.MessageConversionException:无法从[[B]转换为[com.watercorp.messaging.types.incoming.UsersDeletedMessage ] 对于 GenericMessage [payload=byte[371], headers={kafka_offset=1,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType=CREATE_TIME,message_id=1645508761,id=f4e947de-22e6-b629- 229b-4fa961c73f2d,类型=USERS_DELETED,kafka_receivedPartitionId=4,contentType=text/plain,kafka_receivedTopic=用户,kafka_receivedTimestamp=1521641760698,时间戳=1521641772477}],failedMessage=GenericMessage [有效负载=字节[371],标头s={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType=CREATE_TIME,message_id=1645508761,id=f4e947de-22e6-b629-229b-4fa961c73f2d,类型=USERS_DELETED,kafka_receivedPartition Id=4,contentType=文本/纯文本, kafka_receivedTopic =用户,kafka_receivedTimestamp = 1521641760698,时间戳= 1521641772477}]在org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)在org.springframework.messaging.handler.in vocal.HandlerMethodArgumentResolverComposite.resolveArgument (HandlerMethodArgumentResolverComposite.java:116) 在 org.springframework.messaging.handler.invocable.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) 在 org.springframework.messaging.handler.inspiration.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org.springframework.integration.handler.AbstractMessageHandler。 handleMessage(AbstractMessageHandler.java:164)在org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org .springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)在org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)在org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher) .java:132)在org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)在org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)在org.springframework.integration。通道.AbstractMessageChannel。发送(AbstractMessageChannel.java:463)在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)在org.springframework .messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java :108)在org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)在org.springframework.integration .kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) 在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java) :932)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.run(KafkaMessageListenerContainer.java:689)在java.util。并发.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.java:745)70)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在org .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java :689) 在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 在 java.util.concurrent.FutureTask.run(FutureTask.java:266) 在 java.lang.Thread.run(Thread.爪哇:745)70)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在org .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer $ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) …
我正在尝试实现一个 spring boot aws kinesis 消费者,它能够自动缩放,以便与原始实例共享负载(分割处理碎片)。
我能够做的:使用Kinesis binder 文档中提供的定义良好的自述文件和示例,我已经能够启动多个消费者,这些消费者实际上通过提供这些属性来划分分片以进行处理。
在生产者上,我通过应用程序属性提供partitionCount: 2。对于消费者,我提供了instanceIndex 和instanceCount。
在消费者1上我有instanceIndex = 0和instantCount = 2,在消费者2上我有instanceIndex = 1和instantCount = 2
这工作正常,我有两个处理其特定分片的 Spring Boot 应用程序。但在这种情况下,我必须为每个启动应用程序提供一个预配置的属性文件,该文件需要在加载时可用,以便它们分割负载。如果我只启动第一个消费者(非自动缩放),我只处理特定于索引 0 的分片,而不处理其他分片。
我想做但不确定是否可以部署一个消费者(处理所有分片)。如果我部署另一个实例,我希望该实例重新体验某些负载的第一个消费者,换句话说,如果我有 2 个分片和一个消费者,它将处理这两个实例,如果我随后部署另一个应用程序,我希望第一个消费者到目前为止,仅处理单个分片,将第二个分片留给第二个消费者。
我尝试通过不在消费者上指定instanceIndex或instanceCount而仅提供组名称来做到这一点,但这使得第二个消费者处于空闲状态,直到第一个消费者关闭。仅供参考,我还创建了自己的元数据和锁定表,以防止活页夹创建默认的元数据和锁定表。
配置:生产者-----------------
originator: KinesisProducer
server:
port: 8090
spring:
cloud:
stream:
bindings:
output:
destination: <stream-name>
content-type: application/json
producer:
headerMode: none
partitionKeyExpression: headers.type
Run Code Online (Sandbox Code Playgroud)
消费者-------------------------------------
originator: KinesisSink
server:
port: 8091
spring:
cloud:
stream:
kinesis:
bindings:
input:
consumer:
listenerMode: batch
recordsLimit: 10
shardIteratorType: TRIM_HORIZON
binder:
checkpoint:
table: <checkpoint-table>
locks:
table: <locking-table
bindings: …Run Code Online (Sandbox Code Playgroud) spring spring-integration amazon-web-services spring-cloud-stream
我无法http | log按照入门流指南部署和执行基本流
:
wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/2.1.0.RELEASE/spring-cloud-dataflow-server/docker-compose.yml;
export DATAFLOW_VERSION=2.1.0.RELEASE;
export SKIPPER_VERSION=2.0.2.RELEASE;
docker-compose up;
open http://localhost:9393/dashboard/#/streams/create;
echo "Then, create a stream via text input: `http | log`"
echo "Then, deploy the stream. The deployment fails with exit code 137."
Run Code Online (Sandbox Code Playgroud)
我得到绿色的“成功部署的流定义”。状态显示为“正在部署”,但从未完全部署。ERROR日志、UI 或网络请求中没有消息。
这是docker-compose up我部署流后的控制台输出。
skipper | 2019-05-24 22:31:53.363 INFO 1 --- [io-7577-exec-10] o.s.s.support.LifecycleObjectSupport : started UPGRADE UPGRADE_DEPLOY_TARGET_APPS_SUCCEED UPGRADE_DEPLOY_TARGET_APPS UPGRADE_START UPGRADE_DELETE_SOURCE_APPS UPGRADE_CHECK_TARGET_APPS UPGRADE_WAIT_TARGET_APPS UPGRADE_CANCEL UPGRADE_DEPLOY_TARGET_APPS_FAILED UPGRADE_CHECK_CHOICE UPGRADE_EXIT INSTALL INSTALL_INSTALL INSTALL_EXIT ERROR DELETE DELETE_DELETE DELETE_EXIT ROLLBACK ROLLBACK_START …Run Code Online (Sandbox Code Playgroud) 实现发件箱模式的通常方法是将消息有效负载存储在发件箱表中,并有一个单独的进程(消息中继)查询待处理的消息,并将它们发布到消息代理中,在我的例子中是 Kafka。
发件箱表的状态可能如下所示。
OUTBOX TABLE
---------------------------------
|ID | STATE | TOPIC | PAYLOAD |
---------------------------------
| 1 | PROCESSED | user |
| 2 | PENDING | user |
| 3 | PENDING | billing |
----------------------------------
Run Code Online (Sandbox Code Playgroud)
My Message Relay 是一个 Spring Boot/Cloud Stream 应用程序,它定期 ( @Scheduled) 查找 PENDING 记录,将它们发布到 Kafka 并将记录更新为 PROCESSED 状态。
第一个问题是:如果我启动 Message Relay 的多个实例,所有实例都会查询 Outbox 表,并且可能在某些时候不同的实例将获得相同的 PENDING 注册表以发布到 Kafka,从而生成重复的消息。我怎样才能防止这种情况?
另一种情况:假设只有一个消息中继。它获取一个 PENDING 记录,将其发布到主题,但在将记录更新为 PROCESSED 之前崩溃。当它再次启动时,它会找到相同的 PENDING 记录并再次发布它。有没有办法避免这种重复,或者唯一的方法是设计一个幂等系统。
我有一个使用 Spring Integration DSL 流和 Kafka 绑定器的简单 Spring Cloud Stream 项目。一切正常,但来自 Kafka 的消息头值以byte[].
这意味着我的 SI@Header参数需要是 类型byte[]。哪个有效,但最好将它们作为字符串(我关心的所有入站标头都是字符串值)。
我已经将 Kafka 客户端配置为使用 StringSerializer/StringDeserializer。我假设我还需要以某种方式告诉 Spring Kafka 哪些标头映射为字符串以及使用什么字符编码。
我显然在这里遗漏了一些东西。有小费吗?
spring spring-integration spring-cloud-stream spring-kafka spring-integration-dsl
我目前正在构建一个应用程序,该应用程序写入卡夫卡主题并监听同一主题以ktable从中生成 并将其具体化到商店中。我正在运行的代码基于以下示例。我几乎复制了其中的大部分内容(除了 PageViewEventSource)并将名称重构为我的用例。我还application.properties使用示例中使用的密钥更新了我的。
运行应用程序时,我收到以下错误:
2020-02-12 17:54:31.982 ERROR 69005 --- [-StreamThread-1] o.a.k.s.p.i.InternalTopicManager : stream-thread [restartedMain] Unexpected error during topic creation for pairing-events-pcmv-changelog.
Error message was: org.apache.kafka.common.errors.PolicyViolationException: Topic replication factor must be 3
2020-02-12 17:54:31.986 ERROR 69005 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [pairing-events-2d949c85-df82-4961-a849-6c6e079edf9c-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Could not create topic pairing-events-pcmv-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:148) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1049) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:635) ~[kafka-streams-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424) ~[kafka-clients-2.3.1.jar:na]
at …Run Code Online (Sandbox Code Playgroud) apache-kafka spring-cloud-stream apache-kafka-streams spring-kafka
I have a spring boot application which has two functionalities Http requests and kafka Messages handling. I want this application to run in mode which is enabled from application.yml i.e if the user wants to enable it only for http requests then kafka should not be connected.
I could achieve this using normal spring boot kafka plugin by disabling auto configure using the following property at @KafkaListener,
autoStartup="${module.put:false}"
现在我们正在尝试转向云流,我发现通过删除云流和活页夹库来禁用它的唯一方法。有没有更好的方法使用自动配置模式的属性来禁用它,或者是否有任何手动配置选项可用?
dynamic-loading apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka
我已经实现了kafka流应用程序。假设流当前正在处理的对象字段之一包含数字而不是字符串值。目前,当处理逻辑中抛出异常时,例如。.transform()方法,整个流被终止,我的应用程序停止处理数据。
我想跳过此类无效记录并继续处理输入主题上可用的下一条记录。此外,我不想在流处理代码中实现任何 try-catch 语句。
为了实现这一点,我实现了StreamsUncaughtExceptionHandler它返回StreamThreadExceptionResponse.REPLACE_THREAD枚举,以便生成新线程并继续处理等待输入主题的下一条记录。然而,事实证明流消费者偏移量没有提交,当新的线程启动时,它会获取刚刚杀死前一个流线程的旧记录......由于逻辑是相同的,新线程也将无法处理错误记录并再次失败。某种循环会产生新线程,但每次都会在同一条记录上失败。
是否有任何干净的方法可以跳过失败的记录并保持流处理下一条记录?
请注意,我不是在问DeserializationExceptionHandler或ProductionExceptionHandler。
error-handling exception apache-kafka spring-cloud-stream apache-kafka-streams
apache-kafka ×5
spring ×5
spring-kafka ×3
exception ×1
java ×1
spring-boot ×1
spring-cloud ×1