Tob*_*obi 0 apache-beam apache-beam-io
我正在使用Apache Beam 2.6从单个Kafka主题中读取并将输出写入Google Cloud Storage(GCS).现在我想改变管道,以便它正在读取多个主题并将其写出来gs://bucket/topic/...
在阅读我TextIO
在管道的最后一步中使用的单个主题时:
TextIO.write()
.to(
new DateNamedFiles(
String.format("gs://bucket/data%s/", suffix), currentMillisString))
.withWindowedWrites()
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1));
Run Code Online (Sandbox Code Playgroud)
这是一个类似的问题,我试图改编的代码.
FileIO.<EventType, Event>writeDynamic()
.by(
new SerializableFunction<Event, EventType>() {
@Override
public EventType apply(Event input) {
return EventType.TRANSFER; // should return real type here, just a dummy
}
})
.via(
Contextful.fn(
new SerializableFunction<Event, String>() {
@Override
public String apply(Event input) {
return "Dummy"; // should return the Event converted to a String
}
}),
TextIO.sink())
.to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/",
currentMillisString),
new SerializableFunction<String, String>() {
@Override
public String apply(String input) {
return null; // Not sure what this should exactly, but it needs to
// include the EventType into the path
}
}))
.withTempDirectory(
FileBasedSink.convertToFileResourceIfPossible(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)))
.withNumShards(1))
Run Code Online (Sandbox Code Playgroud)
在官方的JavaDoc包含示例代码,这似乎有过时的方法签名.(该.via
方法似乎已经改变了参数的顺序).我还跨在例如跌跌撞撞FileIO
这让我感到困惑-不应该TransactionType
与Transaction
在这一行改变的地方?
经过一夜的睡眠和重新开始,我想出了解决方案,我使用了功能性的Java 8风格,因为它使代码更短(更可读):
.apply(
FileIO.<String, Event>writeDynamic()
.by((SerializableFunction<Event, String>) input -> input.getTopic())
.via(
Contextful.fn(
(SerializableFunction<Event, String>) input -> input.getPayload()),
TextIO.sink())
.to(String.format("gs://bucket/data%s/", suffix)
.withNaming(type -> FileNaming.getNaming(type, "", currentMillisString))
.withDestinationCoder(StringUtf8Coder.of())
.withTempDirectory(
String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))
.withNumShards(1));
Run Code Online (Sandbox Code Playgroud)
说明:
Event
是包含Kafka消息的有效负载及其所属主题的Java POJO,它ParDo
在KafkaIO
步骤之后被解析suffix
是一个或者是dev
空的,由环境变量设置currentMillisString
包含启动整个管道时的时间戳,以便在管道重新启动时新文件不会覆盖GCS上的旧文件FileNaming
实现自定义命名并在其构造函数中接收事件类型(主题),它使用自定义格式化程序写入GCS上的每日分区"子文件夹":
class FileNaming implements FileIO.Write.FileNaming {
static FileNaming getNaming(String topic, String suffix, String currentMillisString) {
return new FileNaming(topic, suffix, currentMillisString);
}
private static final DateTimeFormatter FORMATTER = DateTimeFormat
.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich")));
private final String topic;
private final String suffix;
private final String currentMillisString;
private String filenamePrefixForWindow(IntervalWindow window) {
return String.format(
"%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString);
}
private FileNaming(String topic, String suffix, String currentMillisString) {
this.topic = topic;
this.suffix = suffix;
this.currentMillisString = currentMillisString;
}
@Override
public String getFilename(
BoundedWindow window,
PaneInfo pane,
int numShards,
int shardIndex,
Compression compression) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filenamePrefix = filenamePrefixForWindow(intervalWindow);
String filename =
String.format(
"pane-%d-%s-%05d-of-%05d%s",
pane.getIndex(),
pane.getTiming().toString().toLowerCase(),
shardIndex,
numShards,
suffix);
String fullName = filenamePrefix + filename;
return fullName;
}
}
Run Code Online (Sandbox Code Playgroud) 归档时间: |
|
查看次数: |
1811 次 |
最近记录: |