Ped*_*ato 1 stream python-3.x google-bigquery google-cloud-pubsub google-cloud-functions
我想将数据流式传输到 BigQuery 中,并且正在考虑使用 PubSub + Cloud Functions,因为不需要转换(至少目前如此),并且使用 Cloud Data Flow 感觉只是将行插入到表中有点过头了。我说得对吗?
数据使用 Python 脚本从 GCP VM 流式传输到 PubSub,其格式如下:
{'SEGMENT':'datetime':'2020-12-05 11:25:05.64684','values':(2568.025,2567.03)}
Run Code Online (Sandbox Code Playgroud)
BigQuery 架构是datetime:timestamp, value_A: float, value_B: float.
我对这一切的疑问是:
a) 我是否需要将其作为 json/字典推送到 BigQuery 中,所有值都作为字符串,还是必须使用表的数据类型?
BQ.insert_rows_jsonb) 使用和之间有什么区别,BQ.load_table_from_json我应该使用哪一个来完成这项任务?
编辑:
我想要获取的实际上是一些资产的市场数据。假设大约 28 种金融工具并捕捉它们的所有价格变动。平均每天,每个工具有约 60.k 个报价,因此我们谈论的是每月约 3360 万次调用。(目前)需要的是将它们插入到表中以供进一步分析。我目前不确定是否应该执行真正的流处理或每批加载。由于该项目尚未进行分析,我认为不需要数据流,但应该使用 PubSub,因为它可以在时机成熟时更轻松地扩展到数据流。这是我第一次实现流管道,我正在使用我通过课程和阅读学到的所有知识。如果我的方法错误,请纠正我:)。
例如,我绝对想做的是,当一个价格变动与第 n 个价格变动之间的价格差异为 10 时,对另一个表执行另一次插入。为此,我应该使用数据流还是云函数方法仍然有效吗?因为这就像一个触发条件。基本上,触发器类似于:
if price difference >= 10:
process all these ticks
insert the results in this table
Run Code Online (Sandbox Code Playgroud)
但我不确定如何实现这个触发器。
除了 Marton (Pentium10) 的精彩回答
a) 您可以在 BigQuery 中流式传输 JSON,这是一个有效的 json。你的例子不是。关于类型,有一个根据您的架构的自动强制/转换。你可以在这里看到这个
b) 加载作业加载 GCS 中的文件或您在请求中放入的内容。该批处理是异步的,可能需要几秒钟或几分钟。此外,每天每张表的负载量限制为1500 次-> 每分钟 1 次(每天 1440 分钟)。加载作业有几个有趣的方面。
相反,流式作业将数据实时插入到 BigQuery 中。当您有实时限制时(特别是对于可视化、异常检测等),这很有趣。但也有一些不好的一面
UNPARTITIONED一段时间或直到该缓冲区已满。。因此,在构建和测试实时应用程序时,您必须考虑到这种特殊性。现在您已经意识到了这一点,请问问自己您的用例。
编辑1:
成本不应驱动您的用例。
如果目前仅用于分析,您只需想象每天触发一次您的工作以获取完整订阅。根据您的指标:60k 指标 * 28 个仪器 * 100 字节(24 + 内存丢失),您只有 168Mb。您可以将其存储在 Cloud Functions 或 Cloud Run 内存中并执行加载作业。
流媒体对于实时性非常重要!
流模式下的数据流将花费您每月至少 20 美元(1 个 n1-standard1 类型的小型工作线程。使用 Cloud Functions 在 BigQuery 中进行超过 1.5Gb 的流插入。
最终,关于流式传输或批量插入的智能触发器,这实际上是不可能的,如果更改逻辑,则必须重新设计数据摄取。但首先,只有当您的用例需要这个时!
| 归档时间: |
|
| 查看次数: |
8997 次 |
| 最近记录: |