Java中的AWS Lambda字节流客户端

Bar*_*rsk 5 java client amazon-web-services aws-lambda

我正在尝试用Java编写一个用于AWS Lambda函数的字节流客户端.我已经创建了Lambda函数作为RequestStreamHandler的实现.该项目的基础是在文档中概述这里.

public class LambdaFunctionHandler implements RequestStreamHandler {
    static final String bucket = "anS3bucket";
    static final String key = "anS3KeyToAJpegFile";

    @Override
    public void  handleRequest(InputStream input, OutputStream output, 
            Context context) throws IOException {

        AmazonS3 s3Client = new AmazonS3Client(
                new EnvironmentVariableCredentialsProvider());
        try {
            context.getLogger().log("Downloading an object\n");
            S3Object s3object = s3Client.getObject(new GetObjectRequest(
                    bucket, key));
            context.getLogger().log("Content-Type: "  + 
                    s3object.getObjectMetadata().getContentType() 
                       + "\n");
            InputStream in = s3object.getObjectContent();
            int b = 0;
            context.getLogger().log("Writing jpeg on output\n");
            while ((b = in.read()) > -1) {
                output.write(b);
            }

        } catch (AmazonServiceException e) {
            System.out.println("Error Message: " + e.getMessage());
        }   
    }
}
Run Code Online (Sandbox Code Playgroud)

这个硬编码的示例在Lambda测试控制台上运行良好.我可以上传JAR并运行lambda函数(通过单击"Test").该函数的作用是它检索jpeg文件内容并将字节流写入OutputStream.我可以在测试控制台中看到二进制输出作为函数结果.到目前为止一切都很好.最终我将在jpeg上运行ImageMagick并调整它 - 这是该项目的目标.

我的客户端代码如下:

public interface ImageService {
    @LambdaFunction(functionName="ImageProcessing")
    OutputStream getImageStream(InputStream data);
}

public class LambdaImageTest {

public static void main(String[] args) throws IOException {
    AWSLambdaClient lambda = new AWSLambdaClient(new  ProfileCredentialsProvider());
    lambda.configureRegion(Regions.EU_WEST_1);

    ImageService service = LambdaInvokerFactory.build(ImageService.class, lambda);

    // Call lambda function, receive byte stream
    OutputStream  out = service.getImageStream(null); 
    System.out.println(out); // This code is not complete
}
Run Code Online (Sandbox Code Playgroud)

当我尝试在Java客户端中接收字节流时,我失败了.似乎没有办法接收字节流.客户端似乎试图将reault读作json数据,这不是我想要的.我想直接读取字节流(jpeg二进制内容).我得到的错误是:

Exception in thread "main" com.amazonaws.services.lambda.invoke.LambdaSerializationException: Failed to parse Lambda function result
    at com.amazonaws.services.lambda.invoke.LambdaInvokerFactory$LambdaInvocationHandler.getObjectFromPayload(LambdaInvokerFactory.java:210)
    at com.amazonaws.services.lambda.invoke.LambdaInvokerFactory$LambdaInvocationHandler.processInvokeResult(LambdaInvokerFactory.java:189)
    at com.amazonaws.services.lambda.invoke.LambdaInvokerFactory$LambdaInvocationHandler.invoke(LambdaInvokerFactory.java:106)
    at com.sun.proxy.$Proxy3.getImageStream(Unknown Source)
    at se.devo.lambda.image.LambdaImageTest.main(LambdaImageTest.java:33)
Caused by: com.fasterxml.jackson.core.JsonParseException: Invalid UTF-8 middle byte 0xff
 at [Source: [B@42257bdd; line: 1, column: 4]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487)
Run Code Online (Sandbox Code Playgroud)

如何在AWS Lambda Java客户端中正确接收字节流数据?

Bar*_*rsk 7

我找到了解决办法。LambdaInvokerFactory 类将始终将请求和响应数据作为 JSON 处理,因此序列化和反序列化这就是问题所在。不过,源代码包含了答案的线索,我已经删除了调用 lambda 函数的部分,但我绕过了 JSON 反序列化并直接访问了有效负载。简单,但它真的应该已经在 LambdaInvokerFactory 类中了......

这是我完全有效的解决方案。Lambda 函数代码:

public class LambdaFunctionHandler implements RequestStreamHandler  {
    public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        AmazonS3 s3Client = new AmazonS3Client(
                new EnvironmentVariableCredentialsProvider());
        try {

            // Need to deserialize JSON data ourselves in Lambda streaming mode
            String data = getJSONInputStream(input);
            context.getLogger().log("JSON data:\n'" + data + "'\n");            
            context.getLogger().log("Deserialize JSON data to object\n");
            ImageRequest request  = mapper.readValue(data, ImageRequest.class);

            context.getLogger().log(String.format("Downloading S3 object: %s %s\n", 
                    request.getBucket(), request.getKey()));
            S3Object s3object = s3Client.getObject(new GetObjectRequest(
                    request.getBucket(), request.getKey()));
            context.getLogger().log("Content-Type: "  + 
                    s3object.getObjectMetadata().getContentType() + "\n");
            InputStream in = s3object.getObjectContent();
            int b = 0;
            byte[] buf = new byte[2048];
            context.getLogger().log("Writing image on output\n");
            while ((b = in.read(buf)) > -1) {
                output.write(buf, 0, b);
            }

        } catch (AmazonServiceException e) {
            System.out.println("Error Message: " + e.getMessage());
        }   
    }

  private String getJSONInputStream(InputStream input) throws IOException {
      BufferedReader reader = new BufferedReader(new InputStreamReader(input));
      String data = "";
      String line;
      while ((line = reader.readLine()) != null) {
                data += line;
      }
      return data;
  }
}
Run Code Online (Sandbox Code Playgroud)

客户端代码:

public class LambdaImageTest {
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public static void main(String[] args) throws IOException {
        String bucketName = args[0]; 
        String key        = args[1]; 

        // Lambda client proxy
        AWSLambdaClient lambda = new AWSLambdaClient(new ProfileCredentialsProvider());
        lambda.configureRegion(Regions.EU_WEST_1);

        // Build InvokeRequest
        InvokeRequest invokeRequest = buildInvokeRequest("ImageProcessing",  
                new ImageRequest(bucketName, key));

        // Invoke and get result payload as ByteBuffer. Note error handling should be done here
        InvokeResult invokeResult = lambda.invoke(invokeRequest);
        ByteBuffer byteBuffer = invokeResult.getPayload();

        // Write payload to file. Output hardcoded...
        FileChannel out = new FileOutputStream("D:/test.jpg").getChannel(); 
        out.write(byteBuffer);
        out.close();
    }

    private static InvokeRequest buildInvokeRequest(String functionName, Object input) {

        InvokeRequest invokeRequest = new InvokeRequest();
        invokeRequest.setFunctionName(functionName); // Lambda function name identifier
        invokeRequest.setInvocationType(InvocationType.RequestResponse);
        invokeRequest.setLogType(LogType.None);

        if (input != null) {
            try {

                String payload = MAPPER.writer().writeValueAsString(input);
                invokeRequest.setPayload(payload);

            } catch (JsonProcessingException ex) {
                throw new LambdaSerializationException("Failed to serialize request object to JSON", ex);
            }
        }

        return invokeRequest;
    }
}
Run Code Online (Sandbox Code Playgroud)

应该注意的是,这里需要改进错误处理。LambdaInvokerFactory 中的源包含丢失的部分。