Chr*_*row 16 java publish-subscribe bigdata google-cloud-platform apache-beam
我有一个用例,我初始化一个包含一组查找数据的HashMap(有关物联网设备的物理位置等的信息).该查找数据用作第二数据集的参考数据,该第二数据集是PCollection.此PCollection是一个数据流,提供IoT设备记录的数据.来自物联网设备的数据流使用Apache Beam管道,该管道作为Google Dataflow使用Google Cloud pub/sub运行.
当我处理PCollection(设备数据)时,我将Google Cloud发布/订阅数据链接到HashMap中的相关查找条目.
我需要更新HashMap,基于第二个将更改推送到其数据的pub/sub.这是我到目前为止获得PCollection并使用HashMap进行查找的方法:
HashMap - >包含预加载的查找数据(有关IoT设备的信息)
PCollection - >包含来自管道数据流的数据(物联网设备记录的数据)
我正在为IoT设备查找数据生成一个HashMap作为单例:
public class MyData {
private static final MyData instance = new MyData ();
private MyData () {
HashMap myDataMap = new HashMap<String, String>();
... logic to populate the map
this.referenceData = myDataMap;
}
public HashMap<Integer, DeviceReference> referenceData;
public static DeviceData getInstance(){
return instance;
}
}
Run Code Online (Sandbox Code Playgroud)
然后我在不同的类中使用HashMap,我订阅了数据的更新(这些是例如给我新的数据的消息,这些数据与已经存储在HashMap中的实体有关).我正在使用带有Apache beam的Google pub/sub订阅更改:
HashMap<String, String> referenceData = MyData.getInstance().referenceData;
Pipeline pipeLine = Pipeline.create(options);
// subscribe to changes in data
org.apache.beam.sdk.values.PCollection myDataUpdates;
myDataUpdates = pipeLine.begin()
.apply(String.format("Subscribe to data updates"),
PubsubIO.readStrings().fromTopic(
String.format("myPubSubPath")));
Run Code Online (Sandbox Code Playgroud)
我想要做的是有效地将数据更新应用于单例HashMap(即根据我的数据订阅操作HashMap).我怎样才能做到这一点?
我对Apache Beam的理解有限,我只知道如何对管道数据进行转换以创建另一个独立的管道数据PCollection.我认为这是Beam的重点,它是用于将大型数据集转换为不同的形式.有没有办法使用Apache Beam 实现我需要的东西(基于pub/sub订阅更新数据集),还是有另一种方法可以使用pub/sub更新HashMap?(我无法轮询数据,因为它会产生太多的延迟和成本,我需要使用订阅更新HashMap).
Google云文档显示了一种直接订阅未链接到Apache Beam管道的Google Cloud pub/sub的方法.这很有希望作为一种潜在的解决方案,并依赖于以下Maven依赖:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.53.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我遇到了冲突,这与Apache Beam的以下Maven依赖项冲突:
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这个问题记录在一个单独的问题中 - Java应用程序中的Maven冲突与google-cloud-core-grpc依赖关系.从我所看到的情况来看,似乎google-cloud-pubsub我使用的Maven工件的哪个版本并不重要,因为从我发现的看起来它看起来像v.2.5.0光束依赖性以及下面将始终与任何电流冲突谷歌依赖的版本.
(我在梁吉拉提出这个问题 - https://issues.apache.org/jira/browse/BEAM-6118)
我正在调查侧输入,并combine作为实现HashMap更新的一种方法:
https://www.programcreek.com/java-api-examples/?api=org.apache.beam.sdk.transforms.Combine
例10显示了一种.getSideInputsMap()可以应用于a的方法payload.我想知道我是否可以以某种方式应用于我对订阅数据更改的订阅.如果我得到PCollection这样的话,我就不能直接链接.getSideInputsMap()到了PCollection
deviceReferenceDataUpdates = pipeLine.begin()
.apply("Get changes to the IoT device lookup data"),
PubsubIO.readMessages().fromTopic("IoT device lookup data")).
Run Code Online (Sandbox Code Playgroud)
我问具体如何我也许能使用一个单独的问题.getSideInputsMap()- 阿帕奇梁-我该如何申请.getSideInputsMap到订阅到谷歌的pub/sub?
我在 Apache Beam 框架中找到了一种执行此操作的方法,如下所示(未完全测试)。
注意- 考虑@Serg M Ten 对 OP 的评论,即更好的方法可能是稍后合并数据,而不是尝试将查找数据加入作为转换处理的一部分。
在这里查看我的答案 - Accessing a HashMap from a different class
main)// initialise singleton HashMap containing lookup data on bootstrap:
LookupData lookupData = LookupData.getInstance();
org.apache.beam.sdk.values.PCollection lookupDataUpdateMessage;
lookupDataUpdateMessage = pipeLine.begin()
.apply("Extract lookup update data", PubsubIO.readStrings().fromTopic("myLookupUpdatePubSubTopic"))
.apply("Transform lookup update data",
ParDo.of(new TransformLookupData.TransformFn()));
org.apache.beam.sdk.values.PCollection lookupDataMessage;
Run Code Online (Sandbox Code Playgroud)
import java.io.Serializable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.transforms.DoFn;
import org.json.JSONObject;
import myLookupSingletonClass;
import myLookupUpObjectClass;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Strings;
public class TransformDeviceMeta
public static class TransformFn extends DoFn<String, MyLookupData> {
@ProcessElement
public void processElement(ProcessContext c)
{
LookupData lookupData = LookupData.getInstance();
MyLookupData myLookupDataUpdate = new MyLookupData();
try
{
byte[] payload = c.element().getBytes();
String myLookUpDataJson = new JSONObject(new String(payload)).toString();
ObjectMapper mapper = new ObjectMapper();
myLookUpDataUpdate = mapper.readValue(myLookUpDataJson , MyLookupData.class);
String updatedLookupDataId = updatedLookupDataId.id;
// logic for HashMap updating e.g:
lookupData.myHashMap.remove(updatedDeviceId);
}
else {
lookupData.myHashMap.put(updatedDeviceId, deviceMetaUpdate);
}
}
catch (Exception ex) {
Log.error(ex.getMessage());
System.out.println("Error " + ex.getMessage());
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
MyLookupData= 形成查找数据模型的类
| 归档时间: |
|
| 查看次数: |
581 次 |
| 最近记录: |