Flo*_*ian 11 google-cloud-dataflow apache-flink apache-beam
我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.
我看过以下方法:
element的processElement(ProcessContext processContext)我的理解是,通过加载的数据.withSideInputs(pCollectionView)需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.
我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?
我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
Run Code Online (Sandbox Code Playgroud)
目前还没有一种方法可以访问流中的每个键侧输入,但正如您所描述的那样,它肯定会很有用,并且我们正在考虑实现它。
一种可能的解决方法是使用侧面输入将指针分发到实际会话历史记录。生成 24 小时会话历史记录的代码可以将它们上传到 GCS/BigQuery/等,然后将位置作为侧面输入发送到加入代码。
| 归档时间: |
|
| 查看次数: |
2137 次 |
| 最近记录: |