是否有包含多个文件的 Python 数据流 Flex 模板示例,其中脚本正在导入同一文件夹中包含的其他文件?
我的项目结构是这样的:
??? pipeline
? ??? __init__.py
? ??? main.py
? ??? setup.py
? ??? custom.py
Run Code Online (Sandbox Code Playgroud)
我正在尝试在 main.py 中导入 custom.py 以获取数据流 flex 模板。
我在管道执行中收到以下错误:
"ModuleNotFoundError: No module named 'custom'"
Run Code Online (Sandbox Code Playgroud)
如果我将所有代码包含在一个文件中并且不进行任何导入,则管道工作正常。
示例 Dockerfile:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template/pipeline
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
COPY pipeline /dataflow/template/pipeline
COPY spec/python_command_spec.json /dataflow/template/
ENV DATAFLOW_PYTHON_COMMAND_SPEC /dataflow/template/python_command_spec.json
RUN pip install avro-python3 pyarrow==0.11.1 apache-beam[gcp]==2.24.0
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
Run Code Online (Sandbox Code Playgroud)
Python规范文件:
{
"pyFile":"/dataflow/template/pipeline/main.py"
}
Run Code Online (Sandbox Code Playgroud)
我正在使用以下命令部署模板:
gcloud builds submit --project=${PROJECT} --tag ${TARGET_GCR_IMAGE} .
任何帮助表示赞赏。
python google-cloud-platform google-cloud-dataflow apache-beam
我在纽约黄色TaxiCab公共数据集的 Google BigQuery中有一个约100万行的表格.从该链接可以看出,架构没有主键.每行代表一次旅行/交易,但没有customer_id字段.
我想添加一个列customer_id并向其分发随机数,以便:
For rows 1-20, `customer_id` should be assigned `1`
For rows 21-40, `customer_id` should be assigned `2`
and so on..
Run Code Online (Sandbox Code Playgroud)
换句话说,我希望表中的确切(和任何)20行具有特定值customer_id.
Row EventType CloudId ts
1 stop 5201156607311872 2018-07-07 12:25:21 UTC
2 start 5201156607311872 2018-07-07 12:27:39 UTC
3 start 5201156607311872 2018-07-07 12:28:15 UTC
4 stop 5738776789778432 2018-07-07 12:28:54 UTC
5 stop 5201156607311872 2018-07-07 12:30:30 UTC
6 stop 5738776789778432 2018-07-07 12:37:45 UTC
7 stop 5738776789778432 2018-07-07 12:40:52 UTC
Run Code Online (Sandbox Code Playgroud)
我有一个如上所述的表结构。我只想EventType在行更改之前过滤第一个事件。即row 2和row 3有相同的EventType,我需要row 3从表中删除。row 4,5,6,7有同样的EventType,我想保留row 4和删除row 5,6,7。
我有两个表A和B.它们都有字段session_id和cookie_id.如何创建一个连接表的输出上的B加入一个session_id,cookie_id用数据流的管道的帮助?CoGroupByKey方法允许您加入单个键.在文档中找不到任何有用的东西.
java google-bigquery google-cloud-platform google-cloud-dataflow
假设我有一个字符串str = "a b c d e".str.split(' ')给我一系列元素[a,b,c,d,e].我如何使用正则表达式来获得这个匹配?
例如:str.match(/ some regex /)给出['a','b','c','d','e']