Spark Athena连接器

Nip*_*pun 0 pyspark amazon-athena

我需要在spark中使用Athena,但在使用JDBC驱动程序时spark使用prepareStatement,它给我一个异常“ com.amazonaws.athena.jdbc.NotImplementedException:方法Connection.prepareStatement尚未实现”

你能让我知道如何连接雅典娜吗

Kir*_*rst 6

我不知道如何从Spark连接到Athena,但是您不需要-您可以非常轻松地从Spark查询Athena包含的数据(或更准确地说,是“寄存器”)。

雅典娜分为两个部分

  1. Hive Metastore(现在称为Glue数据目录),其中包含数据库和表名以及所有基础文件之间的映射
  2. Presto查询引擎,可将您的SQL转换为针对这些文件的数据操作

启动EMR群集(v5.8.0和更高版本)时,可以指示其连接到Glue数据目录。这是“创建集群”对话框中的复选框。选中此选项时,Spark SqlContext将连接到Glue数据目录,您将能够看到Athena中的表格。

然后,您可以正常查询这些表。

有关更多信息,请参见https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html


Wil*_*Wil 6

您可以使用这个 JDBC 驱动程序:SimbaAthenaJDBC

<dependency>
    <groupId>com.syncron.amazonaws</groupId>
    <artifactId>simba-athena-jdbc-driver</artifactId>
    <version>2.0.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

使用:

SparkSession spark = SparkSession
    .builder()
    .appName("My Spark Example")
    .getOrCreate();

Class.forName("com.simba.athena.jdbc.Driver");

Properties connectionProperties = new Properties();
connectionProperties.put("User", "AWSAccessKey");
connectionProperties.put("Password", "AWSSecretAccessKey");
connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
connectionProperties.put("AwsCredentialsProviderClass", 
    "com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");

List<String> predicateList =
    Stream
        .of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
        .collect(Collectors.toList());
String[] predicates = new String[predicateList.size()];
predicates = predicateList.toArray(predicates);

Dataset<Row> data =
    spark.read()
        .jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
            "my_env.my_table", predicates, connectionProperties);
Run Code Online (Sandbox Code Playgroud)

您还可以在 Flink 应用程序中使用此驱动程序:

TypeInformation[] fieldTypes = new TypeInformation[] {
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO
};

RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.simba.athena.jdbc.Driver")
    .setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
    .setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
    .setRowTypeInfo(rowTypeInfo)
    .finish();

DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);
Run Code Online (Sandbox Code Playgroud)