Jav*_*mer 3 java amazon-web-services amazon-kinesis amazon-kinesis-firehose
使用 Java Lambda 函数进行 kinesis data firehose 转换时,出现以下错误。下面是我转换后的 JSON 样子
{
"records": [
{
"recordId": "49586022990098427206724983301551059982279766660054253570000000",
"result": "Ok",
"data": "ZXlKMGFXTnJaWEpmYzNsdFltOXNJam9pVkVWVFZEY2lMQ0FpYzJWamRHOXlJam9pU0VWQlRGUklRMEZTUlNJc0lDSmphR0Z1WjJVaQ0KT2kwd0xqQTFMQ0FpY0hKcFkyVWlPamcwTGpVeGZRbz0="
}
]
}
Run Code Online (Sandbox Code Playgroud)
kinesis 控制台中的错误就像
无效的输出结构:请检查您的函数并确保处理的记录包含有效的结果状态 Dropped、Ok 或 ProcessingFailed
任何人对此都有想法,我找不到在 kinesis 数据转换中使用 Java 的示例代码
https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
本文档说明了输出结构
小智 6
我刚刚在 Scala(兼容 Java)中完成了这个过程。关键是使用返回类型:com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse
import java.nio.ByteBuffer
import com.amazonaws.services.lambda.runtime.events.KinesisAnalyticsInputPreprocessingResponse._
import com.amazonaws.services.lambda.runtime.events.{KinesisAnalyticsInputPreprocessingResponse, KinesisFirehoseEvent}
import com.amazonaws.services.lambda.runtime.{Context, LambdaLogger, RequestHandler}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
class Handler extends RequestHandler[KinesisFirehoseEvent, KinesisAnalyticsInputPreprocessingResponse] {
override def handleRequest(in: KinesisFirehoseEvent, context: Context): KinesisAnalyticsInputPreprocessingResponse = {
val logger: LambdaLogger = context.getLogger
val records = in.getRecords
val tranformed = records.flatMap(record => {
try {
val changed = record.getData.array()
//do some sort of transform
val rec = new Record(record.getRecordId, Result.Ok, ByteBuffer.wrap(changed))
Some(rec)
} catch {
case e: Exception => {
logger.log(e.toString)
Some(new Record(record.getRecordId, Result.Dropped, record.getData))
}
}
})
val response = new KinesisAnalyticsInputPreprocessingResponse()
response.setRecords(tranformed.toList)
response
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1448 次 |
| 最近记录: |