从kafka转换数据的最简单方法

Sco*_*ott 1 bigdata apache-kafka apache-kafka-streams apache-kafka-connect

我正在开发一个项目,使用kafka connect从多个数据库源中提取数据.我希望能够将数据转换为指定的json格式,然后最终将最终的json推送到S3存储桶,最好使用kafka connect来保持我的开销.

以下是数据目前进入kafka(以avro格式)的示例:

{"tableName":"TABLE1","SchemaName{"string":"dbo"},"tableID":1639117030,"columnName":{"string":"DATASET"},"ordinalPosition":{"int":1},"isNullable":{"int":1},"dataType":{"string":"varchar"},"maxLength":{"int":510},"precision":{"int":0},"scale":{"int":0},"isPrimaryKey":{"int":0},"tableSizeKB":{"long":72}}
{"tableName":"dtproperties","SchemaName":{"string":"dbo"},"tableID":1745441292,"columnName":{"string":"id"},"ordinalPosition":{"int":1},"isNullable":{"int":0},"dataType":{"string":"int"},"maxLength":{"int":4},"precision":{"int":10},"scale":{"int":0},"isPrimaryKey":{"int":1},"tableSizeKB":{"long":24}}
Run Code Online (Sandbox Code Playgroud)

转换为JSON时看起来如此:

{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SVALUE",
      "ordinalPosition": 6,
      "isNullable": 1,
      "dataType": "varchar",
      "maxLength": 4000,
      "precision": 0,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
},
{
      "tablename" : "AS_LOOKUPS",
      "tableID": 5835333,
      "columnName": "SORT_ORDER",
      "ordinalPosition": 7,
      "isNullable": 1,
      "dataType": "int",
      "maxLength": 4,
      "precision": 10,
      "scale": 0,
      "isPrimaryKey": 0,
      "tableSize": 0,
      "sizeUnit": "GB"
}
Run Code Online (Sandbox Code Playgroud)

我的目标是让数据看起来像这样:

{
  "header": "Database Inventory",
  "DBName": "DB",
  "ServerName": "server@server.com",
  "SchemaName": "DBE",
  "DB Owner": "Name",
  "DB Guardian" : "Name/Group",
  "ASV" : "ASVC1AUTODWH",
  "ENVCI": "ENVC1AUTODWHORE",
  "Service Owner" : "Name/Group",
  "Business Owner" : "Name/Group",
  "Support Owner" : "Name/Group",
  "Date of Data" : "2017-06-28 12:12:55.000",
  "TABLE_METADATA": {
  "TABLE_SIZE" : "500",
  "UNIT_SIZE" : "GB",
  "TABLE_ID": 117575457,
  "TABLE_NAME": "spt_fallback_db",
  "COLUMN_METADATA": [
  {
    "COLUMN_NM": "xserver_name",
    "DATE_TYPE": "varchar",
    "MAX_LENGTH": 30,
    "PRECISION": 0,
    "SCALE": 0,
    "IS_NULLABLE": 0,
    "PRIMARY_KEY": 0,
    "ORDINAL_POSITION": 1
  },
  {
    "COLUMN_NM": "xdttm_ins",
    "DATE_TYPE": "datetime",
    "MAX_LENGTH": 8,
    "PRECISION": 23,
    "SCALE": 3,
    "IS_NULLABLE": 0,
    "PRIMARY_KEY": 0,
    "ORDINAL_POSITION": 2
  }, ........
Run Code Online (Sandbox Code Playgroud)

标题数据通常是通用的,但是需要填充一些诸如日期等内容.

最初我的想法是我可以利用kafka connect做所有事情,并且我可以为我想要格式化数据的方式创建一个模式.我有一个问题,虽然利用连接器使用不同的架构,我不确定它是否可能.

我想到的另一个解决方案是使用Kafka Streams,并编写代码将数据转换为所需的数据.我不确定用Kafka Streaming做起来有多容易.

最后,我看到的第三个解决方案是利用Apache Spark,并使用数据帧操纵数据.但这会增加更多开销.

老实说,我不确定要走哪条路,或者这些解决方案是否符合我的要求.所以我对如何解决这个问题的所有建议持开放态度.

Ran*_*uch 8

Kafka Connect确实有简单消息转换(SMT),这是一个框架,用于在源连接器写入Kafka之前对源连接器生成的记录进行微调,或者在将它们发送到接收器连接器之前从Kafka读取的记录.大多数SMT都是非常简单的功能,但您可以将它们链接在一起,进行稍微复杂的操作.您始终可以使用自定义逻辑实现自己的转换,但无论每次转换在一条记录上运行的是什么,都不应该调用其他服务.SMT仅用于个人记录的基本操作.

但是,您想要进行的更改可能比通过SMT更合适.Kafka Streams似乎是这个问题的最佳解决方案,因为它允许您创建一个简单的流处理器,消耗源连接器生成的主题,相应地改变(并可能组合)消息,并将它们写出来到其他主题.由于您已经在使用Avro,因此您可以编写Streams应用程序以使用Avro通用记录(请参阅此示例)或使用Avro架构自动生成的类(请参阅此示例).

您还提到您拥有来自多个来源的数据,而这些数据可能会分开主题.如果您想要将这些主题集成,加入,组合或简单地合并到其他主题中,那么Kafka Streams是一种很好的方法.

Kafka Streams应用程序也只是普通的Java应用程序,因此您可以使用您选择的平台部署它们,无论是Docker,Kubernetes,Mesos,AWS还是其他.而且他们不需要像Apache Spark那样运行的分布式平台.