如果你正在使用Beam Java,你可以使用FileIO.writeDynamic()
它(从目前正在发布的Beam 2.3开始 - 但你已经可以通过版本使用它2.3.0-SNAPSHOT
),或旧的DynamicDestinations
API(在Beam 2.2中可用).
FileIO.writeDynamic()
用于将PCollection
银行交易写入GCS上的不同路径的示例,具体取决于交易的类型:
PCollection<BankTransaction> transactions = ...;
transactions.apply(
FileIO.<BankTransaction, TransactionType>writeDynamic()
.by(Transaction::getType)
.via(BankTransaction::toString, TextIO.sink())
.to("gs://bucket/myfolder/")
.withNaming(type -> defaultNaming("transactions_", ".txt"));
Run Code Online (Sandbox Code Playgroud)
有关使用示例DynamicDestinations
,请参阅TextIO单元测试中的示例代码.
或者,如果要将每条记录写入其自己的文件,只需使用FileSystems
API中的API(特别是FileSystems.create()
)DoFn
.
归档时间: |
|
查看次数: |
1621 次 |
最近记录: |