从数据流作业连接到Cloud SQL

Jim*_*mmy 7 google-cloud-sql google-cloud-dataflow apache-beam

我正在努力将JdbcIO与Apache Beam 2.0(Java)结合使用,以从同一项目中的Dataflow连接到Cloud SQL实例。

我收到以下错误:

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
Run Code Online (Sandbox Code Playgroud)
  • 根据文档,如果数据流服务帐户*@dataflow-service-producer-prod.iam.gserviceaccount.com拥有“编辑”权限,则应有权访问同一项目中的所有资源。

  • 当我使用DirectRunner运行相同的Dataflow作业时,一切正常。

这是我正在使用的代码:

private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";

PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
 .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
  .withUsername(JDBC_USER).withPassword(JDBC_PW))
 .withQuery(
  "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
 .withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
 .withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () {
  public KV < String, Double > mapRow(ResultSet resultSet) throws Exception {
   return KV.of(resultSet.getString(1), resultSet.getDouble(2));
  }
 }));
Run Code Online (Sandbox Code Playgroud)

编辑:

在另一个数据流作业中的波束外使用以下方法似乎可以与DataflowRunner一起正常工作,这告诉我数据库可能不是问题。

java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);
Run Code Online (Sandbox Code Playgroud)

小智 5

按照以下有关如何从Java连接到Cloud SQL的说明进行操作:

https://cloud.google.com/sql/docs/mysql/connect-external-app#java

我设法使它起作用。

这就是代码的样子(您必须用值替换MYDBNAME,MYSQLINSTANCE,USER和PASSWORD。

注意:MYSQLINSTANCE格式为project:zone:instancename。

我正在使用自定义类(Customer)存储每一行​​的值,而不是键值对。

p.apply(JdbcIO. <Customer> read()
    .withDataSourceConfiguration(
        JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", 
            "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
        )
    )
    .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
    .withCoder( AvroCoder.of(Customer.class) )
    .withRowMapper(
        new JdbcIO.RowMapper < Customer > ()
        {
            @Override
            public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
            {
                final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
                LOG.info(resultSet.getString(2));
                Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
                return customer;
            }
        }
    )
);
Run Code Online (Sandbox Code Playgroud)

我希望这有帮助。


Ale*_*ato 2

我认为这种方法可能效果更好,请尝试 com.mysql.jdbc.GoogleDriver,并使用此处列出的 Maven 依赖项。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

相关问题: 我在哪里找到并下载这个jar文件com.mysql.jdbc.GoogleDriver?