Max*_*Max 5 java dataflow google-cloud-dataflow apache-beam
我很抱歉提出关于这个一般问题的另一个问题,但我在 SO 上发现的问题似乎都与我的问题密切相关。
我有一个现有的、工作的数据流管道,它接受对象KV<Long, Iterable<TableRow>>并输出TableRow对象。这段代码在我们的生产环境中,运行没有问题。然而,我现在正在尝试使用直接运行器实现单元测试来测试此管道,但是单元测试在到达线路时失败
LinkedHashMap<String, Object> evt = (LinkedHashMap<String, Object>) row.get(Schema.EVT);
Run Code Online (Sandbox Code Playgroud)
在管道中,抛出错误消息:
java.lang.ClassCastException:com.google.gson.internal.LinkedTreeMap 无法转换为 java.util.LinkedHashMap
现有数据流代码的简化版本如下所示:
public static class Process extends DoFn<KV<Long, Iterable<TableRow>>, TableRow> {
/* private variables */
/* constructor */
/* private functions */
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException, ParseException {
EventProcessor eventProc = new EventProcessor();
Processor.WorkItem workItem = new Processor.WorkItem();
Iterator<TableRow> it = c.element().getValue().iterator();
// process all TableRows having the same id
while (it.hasNext()) {
TableRow item = it.next();
if (item.containsKey(Schema.EVT))
eventProc.process(item, workItem);
else
/* process by different Proc class */
}
/* do additional logic */
/* c.output() is somewhere far below */
}
}
public class EventProcessor extends Processor {
// Extract data from an event into the WorkItem
@SuppressWarnings("unchecked")
@Override
public void process(TableRow row, WorkItem item) {
try {
LinkedHashMap<String, Object> evt = (LinkedHashMap<String, Object>) row.get(Schema.EVT);
LinkedHashMap<String, Object> profile = (LinkedHashMap<String, Object>) row.get(Schema.PROFILE);
/* if no exception, process further business logic */
/* business logic */
} catch (ParseException e) {
System.err.println("Bad row");
}
}
}
Run Code Online (Sandbox Code Playgroud)
单元测试的相关部分(准备 的主要输入)Process() DoFn如下所示:
Map<Long, List<TableRow>> groups = new HashMap<Long, List<TableRow>>();
List<KV<Long, Iterable<TableRow>>> collections = new ArrayList<KV<Long,Iterable<TableRow>>>();
Gson gson = new Gson();
// populate the map with events grouped by id
for(int i = 0; i < EVENTS.length; i++) {
TableRow row = gson.fromJson(EVENTS[i], TableRow.class);
Long id = EVENT_IDS[i];
if(groups.containsKey(id))
groups.get(id).add(row);
else
groups.put(id, new ArrayList<TableRow>(Arrays.asList(row)));
}
// prepare main input for pipeline
for(Long key : groups.keySet())
collections.add(KV.of(key, groups.get(key)));
Run Code Online (Sandbox Code Playgroud)
导致问题的行是gson.fromJson(EVENTS[i], TableRow.class);,它似乎将 TableRow 的内部表示形式编码为 LinkedTreeMap 的错误类型。
TableRow 的编码类型似乎不是com.google.gson.internal.LinkedTreeMap预期的java.util.LinkedHashMap。有没有一种方法可以将单元测试中创建的 TableRow 转换为正确的类型java.util.LinkedHashMap,以便单元测试成功,而无需对已在生产中运行的现有数据流代码进行任何更改?
重新发布解决方案作为答案。
如果您不使用具体类的特定功能,则不建议转换为具体类。在这种情况下,最好转换为 toMap而不是LinkedHashMap。GsonLinkedTreeMap也是一个Map,所以应该不会出现问题。
| 归档时间: |
|
| 查看次数: |
8075 次 |
| 最近记录: |