Jim*_*mmy 7 google-cloud-sql google-cloud-dataflow apache-beam
我正在努力将JdbcIO与Apache Beam 2.0(Java)结合使用,以从同一项目中的Dataflow连接到Cloud SQL实例。
我收到以下错误:
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
Run Code Online (Sandbox Code Playgroud)
根据文档,如果数据流服务帐户*@dataflow-service-producer-prod.iam.gserviceaccount.com拥有“编辑”权限,则应有权访问同一项目中的所有资源。
当我使用DirectRunner运行相同的Dataflow作业时,一切正常。
这是我正在使用的代码:
private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";
PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
.withUsername(JDBC_USER).withPassword(JDBC_PW))
.withQuery(
"SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
.withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
.withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () {
public KV < String, Double > mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getDouble(2));
}
}));
Run Code Online (Sandbox Code Playgroud)
编辑:
在另一个数据流作业中的波束外使用以下方法似乎可以与DataflowRunner一起正常工作,这告诉我数据库可能不是问题。
java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);
Run Code Online (Sandbox Code Playgroud)
小智 5
按照以下有关如何从Java连接到Cloud SQL的说明进行操作:
https://cloud.google.com/sql/docs/mysql/connect-external-app#java
我设法使它起作用。
这就是代码的样子(您必须用值替换MYDBNAME,MYSQLINSTANCE,USER和PASSWORD。
注意:MYSQLINSTANCE格式为project:zone:instancename。
我正在使用自定义类(Customer)存储每一行的值,而不是键值对。
p.apply(JdbcIO. <Customer> read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver",
"jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
)
)
.withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
.withCoder( AvroCoder.of(Customer.class) )
.withRowMapper(
new JdbcIO.RowMapper < Customer > ()
{
@Override
public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
{
final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
LOG.info(resultSet.getString(2));
Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
return customer;
}
}
)
);
Run Code Online (Sandbox Code Playgroud)
我希望这有帮助。
我认为这种方法可能效果更好,请尝试 com.mysql.jdbc.GoogleDriver,并使用此处列出的 Maven 依赖项。
https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database
相关问题: 我在哪里找到并下载这个jar文件com.mysql.jdbc.GoogleDriver?
| 归档时间: |
|
| 查看次数: |
5334 次 |
| 最近记录: |