我创建了一个Pipeline,它从GCS中的文件读取,转换它,最后写入BQ表.该文件包含标题行(字段).
有没有办法以编程方式设置"跳过的标题行数",就像加载时在BQ中可以做的那样?

我有一个连接到pub/sub的流管道,它发布了GCS文件的文件名.从那里我想读取每个文件并解析每一行上的事件(事件是我最终想要处理的事件).
我可以使用TextIO吗?当在执行期间定义文件名时,您是否可以在流管道中使用它(而不是使用TextIO作为源,并且fileName(s)在构造时已知).如果不是,我正在考虑做以下事情:
从pub/sub ParDo获取主题以读取每个文件并获取行处理文件的行...
我可以使用FileBasedReader或类似的东西来读取文件吗?文件不是太大,所以我不需要并行读取单个文件,但我需要读取大量文件.
我正试图找到一种优雅地结束我的工作的方式,以免丢失任何数据,从PubSub流式传输并写入BigQuery.
我可以设想的一种可能的方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道是否/如何实现这一点.
BigQuery可以从Google云端硬盘中读取联合来源.看到这里.我希望能够将BigQuery中的表读入指向Drive文档的Dataflow管道中.
将BigQuery挂接到Drive中的文件非常正常:
但是,当我尝试将该表读入我的Dataflow管道时,我(可以理解)得到以下错误:
找不到合适的凭据来访问Google云端硬盘.联系表所有者以获取帮助.
[..]
PCollection<TableRow> results = pipeline.apply("whatever",
BigQueryIO.Read.fromQuery("SELECT * from [CPT_7414_PLAYGROUND.google_drive_test]"))
.apply(ParDo.of(new DoFn<TableRow, TableRow>() {
[..]
Run Code Online (Sandbox Code Playgroud)
我如何使Dataflow能够从BigQuery中指向Drive的表中读取权限?
我使用Apache Beam SDK在Python中创建了Pipeline,并且Dataflow作业在命令行中运行良好.
现在,我想从UI运行这些工作.为此,我必须为我的工作创建模板文件.我找到了使用maven在Java中创建模板的步骤.
但是我如何使用Python SDK呢?
我正在努力将JdbcIO与Apache Beam 2.0(Java)结合使用,以从同一项目中的Dataflow连接到Cloud SQL实例。
我收到以下错误:
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
Run Code Online (Sandbox Code Playgroud)
根据文档,如果数据流服务帐户*@dataflow-service-producer-prod.iam.gserviceaccount.com拥有“编辑”权限,则应有权访问同一项目中的所有资源。
当我使用DirectRunner运行相同的Dataflow作业时,一切正常。
这是我正在使用的代码:
private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";
PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
.withUsername(JDBC_USER).withPassword(JDBC_PW))
.withQuery(
"SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
.withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
.withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () …Run Code Online (Sandbox Code Playgroud) 我们有一个流管道,我们已启用自动缩放功能.通常,一个工作人员足以处理传入的数据,但是如果存在积压工作,我们希望自动增加工作人员的数量.
我们的管道从Pubsub读取,并且每3分钟使用加载作业将批次写入BigQuery.我们从一个工作者开始运行此管道,向pubsub发布两倍于一个工作者可以使用的数据.2小时后,自动缩放仍未启动,因此积压数据约为1小时.考虑到自动调节旨在将积压保持在10秒以下(根据此SO答案),这似乎相当差.
这里的文档说,流媒体作业的自动调节是测试版,并且如果接收器是高延迟的,那么已知它是粗粒度的.是的,我想每3分钟做一次BigQuery批次就算是高延迟!在改进此自动缩放算法方面是否有任何进展?
在此期间我们可以做任何解决方法,例如测量管道中不同点的吞吐量吗?我找不到有关如何将吞吐量报告给自动扩展系统的任何文档.
我对GCP和数据流非常陌生。但是,我想开始测试和部署一些利用GCP上的数据流的流。根据文档,有关数据流的所有内容都必须使用Apache项目BEAM。因此,请按照此处的官方文档进行操作的情况下,受支持的python版本是2.7
坦白地说,由于Python 2.x版本将由于没有官方支持而消失,并且每个人都在使用3.x版本,因此这确实令人失望。尽管如此,我想知道是否有人知道如何准备在python版本中运行的beam和GCP数据流。
我看了这部影片并了这个牧师如何完成这个美好的里程碑,并且显然可以在Python 3.5上运行。
更新资料:
自从我努力处理数据流以来,我想要的伙计们引起了我的思考。我对使用Java或Python版本的工具开始具有挑战性感到非常失望。从python开始,存在关于版本3的限制,该版本几乎是当前的标准。另一方面,java在版本11上运行时会遇到问题,我必须进行一些调整才能在代码的版本8上运行,然后我开始在代码上遇到许多不兼容问题。简而言之,如果GCP真正想前进并成为第一名,那么还有很多地方需要改进。:失望
解决方法:
我将Java版本降级为jdk 8,安装了maven,现在eclipse版本适用于Apache Beam。
我终于解决了,但是,GCP确实请考虑增强并扩展对Java / Python最新版本的支持。
非常感谢
python dataflow google-cloud-platform google-cloud-dataflow apache-beam
我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)中累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。
我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。
这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):
import apache_beam as beam
TOPIC_PATH="projects/<my-project>/topics/<my-topic>"
def CombineFn(e):
return "\n".join(e)
o = beam.options.pipeline_options.PipelineOptions()
p = beam.Pipeline(options=o)
data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| "Window" >> beam.WindowInto(beam.window.FixedWindows(30))
| "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults()
| "Output" >> beam.io.WriteToText("<GCS path or local path>"))
res = p.run()
res.wait_until_finish()
Run Code Online (Sandbox Code Playgroud)
我在本地环境中运行该程序没有问题。
python main.py
Run Code Online (Sandbox Code Playgroud)
它在本地运行,但可以从指定的Pub / Sub主题读取,并且每隔30秒就会按预期写入指定的GCS路径。
但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most …Run Code Online (Sandbox Code Playgroud) python google-cloud-pubsub google-cloud-dataflow apache-beam
当我使用像BigQueryHook这样的运算符/挂钩时,我看到一条消息,表明这些运算符已被弃用,并使用airflow.gcp ...运算符版本。但是,当我尝试在dag中使用它时,它失败并说没有名为airflow.gcp的模块。我拥有带beta功能的最新气流作曲家版本python3。是否可以通过某种方式安装这些运算符?
我正在尝试使用梁2.15在python 3中运行数据流作业。我已经尝试过virtualenv运算符,但这不起作用,因为它只允许使用python2.7。我怎样才能做到这一点?
python-3.x google-cloud-dataflow airflow google-cloud-composer airflow-operator