IoT*_*ser 1 python google-cloud-platform google-cloud-dataflow google-cloud-firestore
我想在带有 python 的数据流模板中使用 FireStore。
我做了这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)
这是使用它的适当方式吗?
额外的信息
def firestore_update_multiple(row):
from google.cloud import firestore
db = firestore.Client()
doc_ref = db.collection(u'data').document(u'one')
doc_ref.update({
u'arrayExample': u'DataflowRunner',
u'booleanExample': True
})
Run Code Online (Sandbox Code Playgroud)
总体思路是正确的,但您应该考虑减少分配 Firestore 连接的频率,并批量处理您的调用。这是应该执行此操作的 ParDo 示例:
class FirestoreUpdateDoFn(beam.DoFn):
def __init__(self, max_batch_size=500):
self.element_batch = []
self.max_batch_size = max_batch_size
def start_bundle(self):
self.db = firestore.Client()
self.batch = db.batch()
self.some_ref = db.collection(...)
def process(self, row):
self.element_batch.append(row)
if len(self.element_batch) >= self.max_batch_size:
self._flush_updates()
def finish_bundle(self):
self._flush_updates()
self.db.close()
def _flush_updates(self):
for elm in self.element_batch:
self.batch.update(...)
batch.commit()
Run Code Online (Sandbox Code Playgroud)
这应该允许您对 Firestore 进行更少的往返调用,并使您的管道更快。然后你会做这样的事情:
with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub)
.with_output_types(bytes)
| 'String to dictionary' >> beam.ParDo(FirestoreUpdateDoFn())
)
Run Code Online (Sandbox Code Playgroud)
查看:
如果您愿意,可以查看的代码PubSubUnboundedSink,它与您尝试做的事情相同:在流上运行时高效地写入外部服务
| 归档时间: |
|
| 查看次数: |
2141 次 |
| 最近记录: |