如何将数据管道定义从EMR 3.x升级到4.x / 5.x?

use*_*092 5 amazon-web-services amazon-emr elastic-map-reduce amazon-data-pipeline

我想将AWS数据管道定义升级到EMR 4.x或5.x,这样我就可以利用Hive的最新功能(版本2.0+),例如CURRENT_DATEand CURRENT_TIMESTAMP等。

从EMR 3.x到4.x / 5.x的更改要求使用releaseLabel in EmrCluster,vs amiVersion

当我使用时"releaseLabel": "emr-4.1.0",出现以下错误:失败:执行错误,从org.apache.hadoop.hive.ql.exec.tez.TezTask返回代码1

以下是我针对EMR 3.x的数据管道定义。它运作良好,所以我希望其他人觉得这很有用(包括emr 4.x / 5.x的答案),因为从文件将数据导入DynamoDB的常见答案/建议是使用数据管道,但实际上没有人提出了一个坚实而简单的工作示例(例如自定义数据格式)。

{
  "objects": [
    {
      "type": "DynamoDBDataNode",
      "id": "DynamoDBDataNode1",
      "name": "OutputDynamoDBTable",
      "dataFormat": {
        "ref": "DynamoDBDataFormat1"
      },
      "region": "us-east-1",
      "tableName": "testImport"
    },
    {
      "type": "Custom",
      "id": "Custom1",
      "name": "InputCustomFormat",
      "column": [
        "firstName", "lastName"
      ],
      "columnSeparator" : "|",
      "recordSeparator" : "\n"
    },
    {
      "type": "S3DataNode",
      "id": "S3DataNode1",
      "name": "InputS3Data",
      "directoryPath": "s3://data.domain.com",
      "dataFormat": {
        "ref": "Custom1"
      }
    },
    {
      "id": "Default",
      "name": "Default",
      "scheduleType": "ondemand",
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://logs.data.domain.com"
    },
    {
      "type": "HiveActivity",
      "id": "HiveActivity1",
      "name": "S3ToDynamoDBImportActivity",
      "output": {
        "ref": "DynamoDBDataNode1"
      },
      "input": {
        "ref": "S3DataNode1"
      },
      "hiveScript": "INSERT OVERWRITE TABLE ${output1} SELECT reflect('java.util.UUID', 'randomUUID') as uuid, TO_DATE(FROM_UNIXTIME(UNIX_TIMESTAMP())) as loadDate, firstName, lastName FROM ${input1};",
      "runsOn": {
        "ref": "EmrCluster1"
      }
    },
    {
      "type": "EmrCluster",
      "name": "EmrClusterForImport",
      "id": "EmrCluster1",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "amiVersion": "3.11.0",
      "region": "us-east-1",
      "terminateAfter": "1 Hours"
    },
    {
      "type": "DynamoDBDataFormat",
      "id": "DynamoDBDataFormat1",
      "name": "OutputDynamoDBDataFormat",
      "column": [
        "uuid", "loadDate", "firstName", "lastName"
      ]
    }
  ],
  "parameters": []
}
Run Code Online (Sandbox Code Playgroud)

示例文件可能看起来像

John|Doe
Jane|Doe
Carl|Doe
Run Code Online (Sandbox Code Playgroud)

奖励CURRENT_DATE我如何在hiveScript部分中设置为变量,而不是在列中设置?我SET loadDate = CURRENT_DATE;\n\n INSERT OVERWRITE..."无济于事。在示例中未显示的是我想在查询子句之前设置的其他动态字段。