我开始使用Spring Dataflow,但遇到一些我无法回答的问题,请阅读文档并进行一些测试。任何澄清都将受到欢迎(如果您不能一次回答所有问题,请回答所有问题,如果需要,我将合并完整的回答)
Spring Dataflow旨在编写适用的工作流程,例如:的输出app A是的输入app B,依此类推。工作流程不需要是线性的,如的输出app A可以是两者的输入app B和app C。这样准确吗?
众所周知,流管道中的应用程序以“消息驱动”的方式进行通信。App A发送消息给代理(例如RabbitMQ或Kafka),并app B从中使用消息。我们的流程中可以有多个不同的经纪人。但是消息传递是在应用程序之间发送信息的唯一方法吗?例如,是否可以通过HTTP REST请求进行app A调用app B?如果是这样,怎么办?
由于应用程序依赖异步消息传递(请参见上面的问题),因此Dataflow的附加值是什么?我的意思是,如果您配置app A为向该foo主题发送消息,并app B使用来自同一主题的消息,则可以将它们分别部署(不使用Dataflow),并且它将起作用。据我了解,Dataflow仅提供一种一次性部署和取消部署所有组件的方法,而不是一个接一个地部署。那是对的吗 ?
像上一个问题一样,异步消息传递使您从定义流顺序中抽象出来(即,您可以在开始app B之前app A)。整个系统仅在两个应用程序都启动时才能工作,但它们甚至不需要彼此了解。唯一需要做的是,他们使用相同的代理和主题,一个用于发送消息,另一个用于获取消息。那么,为什么您绝对需要在Spring Cloud Dataflow中将一个应用程序的输出链接到另一个应用程序的输入?这是一种强制两个应用程序使用相同主题的方法,但这是全部吗?
这似乎是一个愚蠢的问题,但我们正在尝试从SCDF shell应用程序定义数据流,但我们遇到引用问题.假设我们想要定义一个带有SpEL表达式的过滤器,只过滤掉JSON输入中没有John Doe名称的任何内容.SpEL将是:
payload.name != 'John Doe'
Run Code Online (Sandbox Code Playgroud)
要么
payload.name ne 'John Doe'
Run Code Online (Sandbox Code Playgroud)
流定义将是:
stream create --name testflow --definition "http | filter --expression=<expression> | log"
Run Code Online (Sandbox Code Playgroud)
我不能用单引号括起<expression>,因为表达式本身包含单引号,我不能使用双引号,因为整个流定义使用那些.我不能只留下引号,因为shell的解析器会被空格和!混淆.从shell应用程序中定义这个流是不可能的?可以从浏览器应用程序执行,但其他数据流defs在那里有解析问题.
以下属性之间的主要区别是什么:
1)spring.rabbitmq.listener.direct.prefetch= # 单个请求中要处理的消息数。它应该大于或等于事务大小(如果使用)。
2)spring.rabbitmq.listener.simple.prefetch= # 单个请求中要处理的消息数。它应该大于或等于事务大小(如果使用)。
我创建了一个自定义 Spring Cloud 流处理器应用程序,并将其部署为 Source|Processor|Sink 流中的处理器步骤。一切似乎都运行良好,但我的自定义应用程序在数据流 UI 中显示“正在部署”。如果这会影响任何事情,我会将它部署为来自 mavenLocal 的 SNAPSHOT。我是否遗漏了什么让 SCDF 知道部署成功?
我很难理解 GCP Dataflow/Apache Beam 和 Spring Cloud Dataflow 之间的差异。我想要做的是转向更原生的流数据处理解决方案,因此我们的开发人员可以更专注于开发核心逻辑而不是管理基础设施。
我们有一个现有的流解决方案,它由 Spring Cloud 数据流“模块”组成,我们可以独立迭代和部署,就像微服务一样,效果很好,但我们希望迁移到我们业务提供的 GCP 中的现有平台,需要我们使用 GCP Dataflow。在高层次上,解决方案很简单:
流 1:
Kafka Source (S0) -> Module A1 (Ingest) -> Module B1 (Map) -> Module C1 (Enrich) -> Module D1 (Split) -> Module E1 (Send output to Sink S1)
Run Code Online (Sandbox Code Playgroud)
流 2:
Kafka Source (S1) -> Module A2 (Ingest) -> Module B2 (Persist to DB) -> Module B3 (Send Notifications through various channels)
Run Code Online (Sandbox Code Playgroud)
根据我的理解,我们想要采用的解决方案应该是相同的,但是模块将成为 GCP Dataflow 模块,源/接收器将成为 GCP Pub/Sub 而不是 kafka。
我遇到的大多数文档都没有将 SCDF 和 …
google-cloud-platform spring-cloud data-science spring-cloud-dataflow apache-beam
我使用 Spring Cloud Data Flow 设置一个读取 CSV 文件的流,使用自定义处理器对其进行转换并记录它:
stream create --name testsourcecsv --definition "file --mode=lines --directory=D:/toto/ --file.filename-pattern=adresses-28.csv --maxMessages=1000 | csvToMap --spring.cloud.stream.bindings.output.content-type=application/json | log --spring.cloud.stream.bindings.input.content-type=application/json" --deploy
Run Code Online (Sandbox Code Playgroud)
文件和 csvToMap 应用程序工作正常,但在日志应用程序中,对于每条记录,我都看到这种异常:
2019-12-03 11:32:46.500 ERROR 1328 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$5 : Could not decode json type: adresses-28.csv for key: file_name
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'adresses': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"adresses-28.csv"; line: 1, column: 10]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.9.jar!/:2.9.9]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703) ~[jackson-core-2.9.9.jar!/:2.9.9]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532) ~[jackson-core-2.9.9.jar!/:2.9.9]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627) ~[jackson-core-2.9.9.jar!/:2.9.9]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832) ~[jackson-core-2.9.9.jar!/:2.9.9]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729) ~[jackson-core-2.9.9.jar!/:2.9.9] …Run Code Online (Sandbox Code Playgroud) spring-cloud spring-cloud-stream spring-cloud-dataflow spring-cloud-stream-binder-kafka
spring-cloud ×2
apache-beam ×1
data-science ×1
rabbitmq ×1
spring ×1
spring-boot ×1