Kinesis Firehose Lambda转换

tho*_*tam 4 python python-3.x aws-lambda

作为Kinesis firehose记录转换的一部分,我具有以下lambda函数,该转换将msgpack记录从kinesis输入流转换为json。

Lambda运行时:python 3.6

from __future__ import print_function

import base64
import msgpack
import json
print('Loading function')


def lambda_handler(event, context):
  output = []

  for record in event['records']:
    payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)

    # Do custom processing on the payload here
    output_record = {
        'recordId': record['recordId'],
        'result': 'Ok',
        'data': json.dumps(payload, ensure_ascii=False).encode('utf8')
    }
    output.append(output_record)

  print('Successfully processed {} records.'.format(len(event['records'])))
  return {'records': output}
Run Code Online (Sandbox Code Playgroud)

但是lambda抛出以下错误:

An error occurred during JSON serialization of response: b'
{
   "id": "d23fd47f-3a62-4383-bcb3-abdb913ea572",
   "timestamp": 1526358140730,
   "message": "Hello World"
}
' is not JSON serializable
Traceback (most recent call last):
File "/var/lang/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/var/lang/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/var/lang/lib/python3.6/json/encoder.py", line 257, in iterencode
 return _iterencode(o, 0)
File "/var/runtime/awslambda/bootstrap.py", line 110, in 
decimal_serializer
raise TypeError(repr(o) + " is not JSON serializable")
Run Code Online (Sandbox Code Playgroud)

我做错什么了吗?

tho*_*tam 5

我能够解决此问题。

这是对我有用的代码。

from __future__ import print_function

import base64
import msgpack
import json

print('Loading function')


def lambda_handler(event, context):
  output = []

  for record in event['records']:
    payload = msgpack.unpackb(base64.b64decode(record['data']), raw=False)

    # Do custom processing on the payload here
    output_record = {
       'recordId': record['recordId'],
       'result': 'Ok',
       'data': base64.b64encode(json.dumps(payload).encode('utf-8') + b'\n').decode('utf-8')
    }
    output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    return {'records': output}
Run Code Online (Sandbox Code Playgroud)