小编Aks*_*pte的帖子

在 Dataflow Python flex 模板中包含另一个文件 ImportError

是否有包含多个文件的 Python 数据流 Flex 模板示例,其中脚本正在导入同一文件夹中包含的其他文件?

我的项目结构是这样的:

??? pipeline
?   ??? __init__.py
?   ??? main.py
?   ??? setup.py
?   ??? custom.py
Run Code Online (Sandbox Code Playgroud)

我正在尝试在 main.py 中导入 custom.py 以获取数据流 flex 模板。

我在管道执行中收到以下错误:

"ModuleNotFoundError: No module named 'custom'"
Run Code Online (Sandbox Code Playgroud)

如果我将所有代码包含在一个文件中并且不进行任何导入,则管道工作正常。

示例 Dockerfile:

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/dataflow/template/pipeline
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY pipeline /dataflow/template/pipeline

COPY spec/python_command_spec.json /dataflow/template/

ENV DATAFLOW_PYTHON_COMMAND_SPEC /dataflow/template/python_command_spec.json

RUN pip install avro-python3 pyarrow==0.11.1 apache-beam[gcp]==2.24.0

ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
Run Code Online (Sandbox Code Playgroud)

Python规范文件:

{
    "pyFile":"/dataflow/template/pipeline/main.py"
}
  
Run Code Online (Sandbox Code Playgroud)

我正在使用以下命令部署模板: gcloud builds submit --project=${PROJECT} --tag ${TARGET_GCR_IMAGE} .

任何帮助表示赞赏。

python google-cloud-platform google-cloud-dataflow apache-beam

6
推荐指数
1
解决办法
678
查看次数

每20次迭代更新SQL中的行

在纽约黄色TaxiCab公共数据集的 Google BigQuery中有一个约100万行的表格.从该链接可以看出,架构没有主键.每行代表一次旅行/交易,但没有customer_id字段.

我想添加一个列customer_id并向其分发随机数,以便:

For rows 1-20, `customer_id` should be assigned `1`
For rows 21-40, `customer_id` should be assigned `2`
and so on..
Run Code Online (Sandbox Code Playgroud)

换句话说,我希望表中的确切(和任何)20行具有特定值customer_id.

mysql sql google-bigquery

5
推荐指数
1
解决办法
99
查看次数

在 Bigquery 中获取特定事件类型的第一行?

Row   EventType   CloudId           ts
1     stop        5201156607311872  2018-07-07 12:25:21 UTC  
2     start       5201156607311872  2018-07-07 12:27:39 UTC  
3     start       5201156607311872  2018-07-07 12:28:15 UTC  
4     stop        5738776789778432  2018-07-07 12:28:54 UTC  
5     stop        5201156607311872  2018-07-07 12:30:30 UTC  
6     stop        5738776789778432  2018-07-07 12:37:45 UTC  
7     stop        5738776789778432  2018-07-07 12:40:52 UTC
Run Code Online (Sandbox Code Playgroud)

我有一个如上所述的表结构。我只想EventType在行更改之前过滤第一个事件。即row 2row 3有相同的EventType,我需要row 3从表中删除。row 4,5,6,7有同样的EventType,我想保留row 4和删除row 5,6,7

sql google-bigquery

2
推荐指数
1
解决办法
776
查看次数

如何使用Cloud Dataflow将两个或多个密钥上的BQ表连接起来?

我有两个表A和B.它们都有字段session_idcookie_id.如何创建一个连接表的输出上的B加入一个session_id,cookie_id用数据流的管道的帮助?CoGroupByKey方法允许您加入单个键.在文档中找不到任何有用的东西.

java google-bigquery google-cloud-platform google-cloud-dataflow

1
推荐指数
1
解决办法
486
查看次数

使用正则表达式按空格拆分字符串

假设我有一个字符串str = "a b c d e".str.split(' ')给我一系列元素[a,b,c,d,e].我如何使用正则表达式来获得这个匹配?

例如:str.match(/ some regex /)给出['a','b','c','d','e']

regex node.js coffeescript

0
推荐指数
2
解决办法
652
查看次数