如何使用kafka中的主题创建包含大量JSON字段的KSQL Stream?

Meh*_*pta 5 apache-kafka confluent ksql

我将一个长JSON字符串传递给kafka主题,例如:

{
    "glossary": {
        "title": "example glossary",
        "GlossDiv": {
            "title": "S",
            "GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
                    "SortAs": "SGML",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "Acronym": "SGML",
                    "Abbrev": "ISO 8879:1986",
                    "GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
                        "GlossSeeAlso": ["GML", "XML"]
                    },
                    "GlossSee": "markup"
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

并且想要从kafka主题创建流,其中所有字段都指定了KSQL中的每个字段,例如:

 CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
Run Code Online (Sandbox Code Playgroud)

Rob*_*att 7

如果您希望KSQL自动选取字段名称,则需要使用Avro.如果您使用Avro,数据的架构将在Confluent架构注册表中注册,KSQL将在您使用该主题时自动检索它.

如果您使用的是JSON,必须告诉KSQL列是什么.您可以在CREATE STREAM语句中使用STRUCT嵌套元素的数据类型执行此操作.

您可以通过仅声明其中的高级字段CREATE STREAM,然后使用EXTRACTJSONFIELD您要使用的字段访问嵌套元素来列出所有字段.请注意,5.0.0中存在问题,将在5.0.1中修复.此外,您不能将此用于您显示的示例数据中的嵌套数组等.