JsonPath 在 Spring Cloud Data Flow 过滤器组件中无法正常工作

Ján*_*ček 13 java spring etl jsonpath spring-cloud-dataflow

我正在尝试编写一个简单的 SCDF 流,该流从 Kafka 读取,通过特定值的存在过滤消息并将数据推送到 Mongo。作为其中的一部分,我必须按照#jsonPath 编写

 #jsonPath(payload,'$[?(@.metadata!=null)].metadata[?(@.trigger-routing!=
 null)].trigger-routing') == {'1'}
Run Code Online (Sandbox Code Playgroud)

我编写了一个示例测试,它将运行 SPeL 并验证它返回的内容(注意:我有意使用 @EnableIntegration 来连接与 SCDF 相同的 SPeL 功能配置文件,至少这是我的理论)

@SpringBootTest(classes = SpelTst.TestConfiguration.class)
public class SpelTst {

    @Configuration
    @EnableIntegration
    public static class TestConfiguration {

    }

    @Autowired
    IntegrationEvaluationContextFactoryBean factory;

    @Test
    public void test() throws JsonProcessingException {
        final StandardEvaluationContext context = factory.getObject();
        ExpressionParser parser = new SpelExpressionParser();
        Expression exp = parser.parseExpression("#jsonPath(payload,'$[?(@.metadata!=null)].metadata[?(@.trigger-routing!= null)].trigger-routing') == {'1'}");

        final PixelHitMessage sampleOne = new PixelHitMessage()
                .setMetadata(ImmutableMap.of("trigger-routing", "1"))
                .toChildType();

        final PixelHitMessage sampleTwo = new PixelHitMessage()
                .setMetadata(ImmutableMap.of("trigger-routing", ""))
                .toChildType();

        final PixelHitMessage sampleThree = new PixelHitMessage()
                .setMetadata(Collections.emptyMap())
                .toChildType();

        final PixelHitMessage sampleFour = new PixelHitMessage()
                .toChildType();


        System.out.println(resolve(context, exp, sampleOne));
        System.out.println(resolve(context, exp, sampleTwo));
        System.out.println(resolve(context, exp, sampleThree));
        System.out.println(resolve(context, exp, sampleFour));
    }

    private static Object resolve(StandardEvaluationContext context, Expression exp, PixelHitMessage sampleOne) throws JsonProcessingException {
        final ObjectMapper mapper = new ObjectMapper();
        final String payload = mapper.writerFor(PixelHitMessage.class).writeValueAsString(sampleOne);
        System.out.println(payload);

        final Message<String> kafkaMessage = MessageBuilder.withPayload(payload).build();

        context.setRootObject(kafkaMessage);

        return exp.getValue(context, Object.class);
    }

}
Run Code Online (Sandbox Code Playgroud)

当我运行它时,我得到以下输出

{"timestamp":"2020-06-26T19:31:38.013Z","level":"INFO","thread":"main","logger":"SpelTst","message":"Started SpelTst in 1.706 seconds (JVM running for 4.352)","context":"default"}

{"eventId":null,"postTime":null,"headers":null,"metadata":{"trigger-routing":"1"}}
true
{"eventId":null,"postTime":null,"headers":null,"metadata":{"trigger-routing":""}}
false
{"eventId":null,"postTime":null,"headers":null,"metadata":{}}
false
{"eventId":null,"postTime":null,"headers":null,"metadata":null}
false
Run Code Online (Sandbox Code Playgroud)

以上是我正在寻求实现的确切行为。

但是当我在 SCDF 的过滤器组件中使用相同的 SPeL 时,出现以下异常

Caused by: com.jayway.jsonpath.PathNotFoundException: No results for path: $['metadata']['trigger-routing']
Run Code Online (Sandbox Code Playgroud)

应返回 false 的消息示例

{"eventId":"acb0afce-7782-4dc6-af09-4d6878fa8fd3","postTime":1593201189799,"headers":{"accept":"*/*","host":"localhost:7071","user-agent":"insomnia/2020.2.2"},"metadata":{}}
Run Code Online (Sandbox Code Playgroud)

应返回 true 的消息示例

{"eventId":"045698d4-d4dc-41b0-8bab-7c07ad58970a","postTime":1593201492866,"headers":{"accept":"*/*","host":"localhost:7071","user-agent":"insomnia/2020.2.2"},"metadata":{"trigger-routing":"1"}}
Run Code Online (Sandbox Code Playgroud)

在 SCDF 中,SPeL 仅适用于积极情况,路径上缺少任何数据会导致上述异常。我正在考虑将 Option.DEFAULT_PATH_LEAF_TO_NULL 用于 JsonPath,但据我所知,无法通过 Spring 属性指定它(我检查了 JsonPathUtils 的代码,他们正在调用使用默认值的 JsonPath 逻辑版本没有默认(空)配置的上下文。

我还验证了过滤器表达式是否正确部署(运行过滤器应用程序的 pod 的 K8 配置屏幕),它似乎是正确的。

K8 配置截图

San*_*anu 0

可以采用这种解决方案。

    <dependency>
        <groupId>com.jayway.jsonpath</groupId>
        <artifactId>json-path</artifactId>
        <version>2.7.0</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

那么验证可以是

    DocumentContext documentContext = JsonPath.parse(messagePayload);
    Map<String, Object> metadata = documentContext.read("$.metadata");
Run Code Online (Sandbox Code Playgroud)

进口是,为DocumentContextJsonPath

import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
Run Code Online (Sandbox Code Playgroud)

现在来说说真实场景

System.out.println(metadata);
Run Code Online (Sandbox Code Playgroud)

输出将是:{trigger-routing=1}

对于错误的场景

System.out.println(metadata);
Run Code Online (Sandbox Code Playgroud)

输出将是:{}

其他 JSON 路径语法是 - https://support.smartbear.com/alertsite/docs/monitors/api/endpoint/jsonpath.html