Min*_*cho 2 snowflake-cloud-data-platform dbt
我有以下问题:
我已经通过创建存储集成设法做到了这一点,并且我已经使用我的角色(用于运行 dbt)手动创建了一个架构并在该架构上分配使用情况。到现在为止还挺好。
然后我读到了这个:
https://github.com/fishtown-analytics/dbt-external-tables
问题是,这是正确运行的唯一方法,我必须更改我的 dbt profiles.yml,将默认架构设置为 S3_MIXPANEL,使用默认数据库 RAW_DEV,使用 --target 'ingest_dev' 参数运行不同的目标和角色.
我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用 {{ source() }} 这样的东西,这样我就可以以某种方式指出我的文档这是一个外部源。我认为这个 dbt-external-tables 并没有很好地解释我的情况?
请任何人都可以帮助我并分享如何在不每次更改默认架构宏和 dbtprofiles.yml 的情况下正确地从外部阶段创建架构和查询?
我已成功运行以下代码:
{{
config(
materialized ='incremental',
schema = generate_schema_name('S3_MIXPANEL')
)
}}
SELECT
metadata$filename as file_name,
to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
$1 as payload,
CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at
from
@my_s3_stage
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date>(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}
Run Code Online (Sandbox Code Playgroud)
编辑 22-06-20:
我已经在我的模型中添加了 src_mixpanel.yml 文件并运行了 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,也许是愚蠢的问题,但我真的不知道如何安装您的软件包,所以我手动将您的所有宏添加到我的中)。
现在,当我运行此代码时:
dbt run-operation stage_external_sources
Run Code Online (Sandbox Code Playgroud)
和
version: 2
sources:
- name: s3_mixpanel
database: RAW_DEV
tables:
- name: events
external:
location: '@my_s3_stage'
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
data_type: date
- name: file_name
expression: metadata$filename
data_type: string
columns:
- name: properties
data_type: variant
Run Code Online (Sandbox Code Playgroud)
我收到一个错误:
运行操作时遇到错误:宏 stage_external_sources (macros/stage_external_sources.sql) 中的编译错误
'dict object' 中的没有属性 'sources'
作为该dbt-external-tables包的维护者,我将分享其固执己见的观点。该包装认为你应该阶段所有外部源(S3文件)作为外部表或与snowpipes第一,在包括尽可能少的混淆逻辑尽可能的处理。然后,您可以在 dbt 模型中选择它们作为源,以及所有必需的业务逻辑。
如果我的理解是正确的,您将按如下方式在一个名为(例如)models/staging/mixpanel/src_mixpanel.yml的文件中暂存您的 mixpanel 数据:
version: 2
sources:
- name: s3_mixpanel
database: raw_dev
tables:
- name: events
external:
location: '@my_s3_stage'
file_format: "( type = json )" # or a named file format
auto_refresh: false # depends on your S3 setup
partitions:
- name: event_date
expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
columns:
- name: properties
data_type: variant
Run Code Online (Sandbox Code Playgroud)
您可以从包中运行此宏来创建外部表,并在创建后更新其分区元数据(如果您尚未auto_refresh启用)(请参阅 Snowflake文档):
dbt run-operation stage_external_sources
Run Code Online (Sandbox Code Playgroud)
然后,您可以在增量模型中从此源中进行选择,就像上面的模型一样。现在,event_date是这个外部表上的一个分区列,因此对其进行过滤应该使 Snowflake 能够修剪文件(尽管对于动态的、子查询派生的过滤器,这在历史上是不一致的)。
{{
config(
materialized ='incremental'
)
}}
SELECT
metadata$filename as file_name,
event_date,
value as payload,
properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at
from {{ source('s3_mixpanel', 'events' }}
{% if is_incremental() %}
-- this filter will only be applied on an incremental run
WHERE event_date >(
SELECT
max(event_date)
FROM
{{ this }}
)
{% endif %}
{{ row_limit() }}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
2061 次 |
| 最近记录: |