Flink - 为什么我应该创建自己的 RichSinkFunction 而不是仅仅打开和关闭 PostgreSql 连接?

Mic*_*eru 1 apache-flink

我想知道为什么我确实需要创建自己的 RichSinkFunction 或使用 JDBCOutputFormat 连接数据库,而不是仅仅使用 SinkFunction 中的传统 PostgreSQL 驱动程序创建连接、执行查询并关闭连接?

我发现很多文章都说这样做,但没有解释为什么?有什么不同?

使用 JDBCOutputFormat 的代码示例,

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("org.postgresql.Driver")
     .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
     .setQuery(query)
     .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
     .finish();
Run Code Online (Sandbox Code Playgroud)

实现自己的 RichSinkFunction 的代码示例,

public class RichCaseSink extends RichSinkFunction<Case> {

  private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
      + "VALUES (?, ?) "
      + "ON CONFLICT (caseid) DO UPDATE SET "
      + "  tracehash=?";

  private PreparedStatement statement;


  @Override
  public void invoke(Case aCase) throws Exception {

    statement.setString(1, aCase.getId());
    statement.setString(2, aCase.getTraceHash());
    statement.setString(3, aCase.getTraceHash());
    statement.addBatch();
    statement.executeBatch();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    Class.forName("org.postgresql.Driver");
    Connection connection =
        DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");

    statement = connection.prepareStatement(UPSERT_CASE);
  }

}
Run Code Online (Sandbox Code Playgroud)

为什么我不能只使用 PostgreSQL 驱动程序?

public class Storable implements SinkFunction<Activity>{

    @Override
    public void invoke(Activity activity) throws Exception {
        Class.forName("org.postgresql.Driver");
        try(Connection connection =
            DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio")){

        statement = connection.prepareStatement(UPSERT_CASE);

        //Perform the query

        //close connection...
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

有人知道 Flink 最佳实践的技术答案吗?RichSinkFunction 的实现或 JDBCOutputFormat 的使用是否有特殊作用?

先感谢您。

Dom*_*ski 8

好吧,您可以使用自己的SinkFunction方法,只需使用invoke()方法来打开连接并写入数据,并且它通常应该可以工作。但在大多数情况下它的性能会非常非常差。

第一个示例和第二个示例之间的实际区别在于,RichSinkFunction您正在使用open()方法来打开连接并准备语句。该open()方法仅在函数初始化时调用一次。在第二个示例中,您将打开与数据库的连接并在invoke()方法内准备语句,该方法会为 input 的每个元素调用DataStream您实际上将为流中的每个元素打开一个新连接

创建数据库连接的成本很高,而且肯定会带来严重的性能缺陷。

  • 好吧,我不会说它真的会由 Flink 管理,它只是允许 Flink 在函数初始化时打开连接。无论您必须编写多少个元素,您通常都应该使用这种方法(也许如果您只有一两个元素,这并不重要),因为打开和数据库连接会引入延迟。还值得注意的是,它将打开一个连接,但针对“RichSinkFunction”的每个实例。 (2认同)