Mic*_*ael 5 mysql google-cloud-dataflow apache-beam apache-beam-io
我正在使用 Dataflow SDK 2.X Java API(Apache Beam SDK)将数据写入 mysql。我创建了基于Apache Beam SDK 文档的管道,以使用数据流将数据写入 mysql。它一次插入单行,因为我需要实现批量插入。我在官方文档中找不到任何启用批量插入模式的选项。
想知道是否可以在数据流管道中设置批量插入模式?如果是,请让我知道我需要在下面的代码中更改什么。
.apply(JdbcIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
.withUsername("username")
.withPassword("password"))
.withStatement("insert into Person values(?, ?)")
.withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
public void setParameters(KV<Integer, String> element, PreparedStatement query) {
query.setInt(1, kv.getKey());
query.setString(2, kv.getValue());
}
})
Run Code Online (Sandbox Code Playgroud)
编辑 2018-01-27:
事实证明,这个问题与 DirectRunner 有关。如果您使用 DataflowRunner 运行相同的管道,您应该获得实际上多达 1,000 条记录的批次。DirectRunner 总是在分组操作后创建大小为 1 的包。
原答案:
我在使用 Apache Beam 的 JdbcIO 写入云数据库时遇到了同样的问题。问题是,虽然 JdbcIO 确实支持批量写入 1,000 条记录,但我从未真正见过它一次写入超过 1 行(我不得不承认:这一直是在开发环境中使用 DirectRunner)。
因此,我在 JdbcIO 中添加了一个功能,您可以通过将数据分组在一起并将每个组编写为一个批次来自己控制批次的大小。以下是基于 Apache Beam 的原始 WordCount 示例如何使用此功能的示例。
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
Run Code Online (Sandbox Code Playgroud)
与 JdbcIO 的普通写入方法的不同之处在于新方法writeIterable()将 aPCollection<Iterable<RowT>>作为输入而不是PCollection<RowT>. 每个 Iterable 都作为一批写入数据库。
可以在此处找到具有此添加的 JdbcIO 版本:https : //github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/ io/jdbc/jdbcIO.java
可以在此处找到包含上述示例的整个示例项目:https : //github.com/olavloite/spanner-beam-example
(在 Apache Beam 上还有一个拉取请求未决,以将其包含在项目中)
| 归档时间: |
|
| 查看次数: |
5433 次 |
| 最近记录: |