使用 Java 进行 Kinesis firehose 数据转换

Jav*_*mer 3 java amazon-web-services amazon-kinesis amazon-kinesis-firehose

使用 Java Lambda 函数进行 kinesis data firehose 转换时,出现以下错误。下面是我转换后的 JSON 样子

{
"records": [
    {
        "recordId": "49586022990098427206724983301551059982279766660054253570000000",
        "result": "Ok",
        "data": "ZXlKMGFXTnJaWEpmYzNsdFltOXNJam9pVkVWVFZEY2lMQ0FpYzJWamRHOXlJam9pU0VWQlRGUklRMEZTUlNJc0lDSmphR0Z1WjJVaQ0KT2kwd0xqQTFMQ0FpY0hKcFkyVWlPamcwTGpVeGZRbz0="
    }
] 
}
Run Code Online (Sandbox Code Playgroud)

kinesis 控制台中的错误就像

无效的输出结构:请检查您的函数并确保处理的记录包含有效的结果状态 Dropped、Ok 或 ProcessingFailed

任何人对此都有想法,我找不到在 kinesis 数据转换中使用 Java 的示例代码

https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

本文档说明了输出结构

小智 6

我刚刚在 Scala(兼容 Java)中完成了这个过程。关键是使用返回类型:com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse

import java.nio.ByteBuffer

import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse._
import com.amazonaws.services.lambda.runtime.events.{KinesisAnalyticsInputPreprocessingResponse, KinesisFirehoseEvent}
import com.amazonaws.services.lambda.runtime.{Context, LambdaLogger, RequestHandler}

import scala.collection.JavaConversions._
import scala.language.implicitConversions

class Handler extends RequestHandler[KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse] {

  override def handleRequest(in: KinesisFirehoseEvent, context: Context): KinesisAnalyticsInputPreprocessingResponse = {

    val logger: LambdaLogger = context.getLogger
    val records = in.getRecords
    val tranformed = records.flatMap(record => {

      try {
        val changed = record.getData.array()
        //do some sort of transform
        val rec = new Record(record.getRecordId, Result.Ok, ByteBuffer.wrap(changed))
        Some(rec)
      } catch {
        case e: Exception => {
          logger.log(e.toString)
          Some(new Record(record.getRecordId, Result.Dropped, record.getData))
        }
      }
    })

    val response = new KinesisAnalyticsInputPreprocessingResponse()
    response.setRecords(tranformed.toList)
    response
  }
}
Run Code Online (Sandbox Code Playgroud)