Nip*_*pun 0 pyspark amazon-athena
我需要在spark中使用Athena,但在使用JDBC驱动程序时spark使用prepareStatement,它给我一个异常“ com.amazonaws.athena.jdbc.NotImplementedException:方法Connection.prepareStatement尚未实现”
你能让我知道如何连接雅典娜吗
我不知道如何从Spark连接到Athena,但是您不需要-您可以非常轻松地从Spark查询Athena包含的数据(或更准确地说,是“寄存器”)。
雅典娜分为两个部分
启动EMR群集(v5.8.0和更高版本)时,可以指示其连接到Glue数据目录。这是“创建集群”对话框中的复选框。选中此选项时,Spark SqlContext将连接到Glue数据目录,您将能够看到Athena中的表格。
然后,您可以正常查询这些表。
有关更多信息,请参见https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html。
您可以使用这个 JDBC 驱动程序:SimbaAthenaJDBC
<dependency>
<groupId>com.syncron.amazonaws</groupId>
<artifactId>simba-athena-jdbc-driver</artifactId>
<version>2.0.2</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
使用:
SparkSession spark = SparkSession
.builder()
.appName("My Spark Example")
.getOrCreate();
Class.forName("com.simba.athena.jdbc.Driver");
Properties connectionProperties = new Properties();
connectionProperties.put("User", "AWSAccessKey");
connectionProperties.put("Password", "AWSSecretAccessKey");
connectionProperties.put("S3OutputLocation", "s3://my-bucket/tmp/");
connectionProperties.put("AwsCredentialsProviderClass",
"com.simba.athena.amazonaws.auth.PropertiesFileCredentialsProvider");
connectionProperties.put("AwsCredentialsProviderArguments", "/my-folder/.athenaCredentials");
connectionProperties.put("driver", "com.simba.athena.jdbc.Driver");
List<String> predicateList =
Stream
.of("id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
.collect(Collectors.toList());
String[] predicates = new String[predicateList.size()];
predicates = predicateList.toArray(predicates);
Dataset<Row> data =
spark.read()
.jdbc("jdbc:awsathena://AwsRegion=us-east-1;",
"my_env.my_table", predicates, connectionProperties);
Run Code Online (Sandbox Code Playgroud)
您还可以在 Flink 应用程序中使用此驱动程序:
TypeInformation[] fieldTypes = new TypeInformation[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.simba.athena.jdbc.Driver")
.setDBUrl("jdbc:awsathena://AwsRegion=us-east-1;UID=my_access_key;PWD=my_secret_key;S3OutputLocation=s3://my-bucket/tmp/;")
.setQuery("select id, val_col from my_env.my_table WHERE id = 'foo' and date >= DATE'2018-01-01' and date < DATE'2019-01-01'")
.setRowTypeInfo(rowTypeInfo)
.finish();
DataSet<Row> dbData = env.createInput(jdbcInputFormat, rowTypeInfo);
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
4683 次 |
| 最近记录: |