使用来自外部源的 dbt 进行查询

Min*_*cho 2 snowflake-cloud-data-platform dbt

我有以下问题:

  • 我有一个 AWS S3 管道,每天都会输出一个 json.gz 文件。
  • 我希望用 dbt 将该文件放入雪花中(没有雪管使用 atm)

我已经通过创建存储集成设法做到了这一点,并且我已经使用我的角色(用于运行 dbt)手动创建了一个架构并在该架构上分配使用情况。到现在为止还挺好。

然后我读到了这个:

https://github.com/fishtown-analytics/dbt-external-tables

问题是,这是正确运行的唯一方法,我必须更改我的 dbt profiles.yml,将默认架构设置为 S3_MIXPANEL,使用默认数据库 RAW_DEV,使用 --target 'ingest_dev' 参数运行不同的目标和角色.

我一直认为应该有一个更复杂的解决方案,我可以在其中创建模式和查询元数据并使用 {{ source() }} 这样的东西,这样我就可以以某种方式指出我的文档这是一个外部源。我认为这个 dbt-external-tables 并没有很好地解释我的情况?

请任何人都可以帮助我并分享如何在不每次更改默认架构宏和 dbtprofiles.yml 的情况下正确地从外部阶段创建架构和查询?

我已成功运行以下代码:

{{
  config(
    materialized ='incremental',
    schema = generate_schema_name('S3_MIXPANEL')
  )
}}
 
  SELECT
    metadata$filename as file_name,
    to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd') as event_date,
    $1 as payload,
    CONVERT_TIMEZONE('Europe/London',TO_TIMESTAMP_tz($1:properties:mp_processing_time_ms::int / 1000)) as  event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as ingested_at

 from

    @my_s3_stage

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date>(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }} 
Run Code Online (Sandbox Code Playgroud)

编辑 22-06-20:

我已经在我的模型中添加了 src_mixpanel.yml 文件并运行了 dbt 命令,但是我还必须指定 data_types,所以我也添加了它们,然后我显然也必须在我的宏中添加“宏”(顺便说一句,也许是愚蠢的问题,但我真的不知道如何安装您的软件包,所以我手动将您的所有宏添加到我的中)。

现在,当我运行此代码时:

dbt run-operation stage_external_sources
Run Code Online (Sandbox Code Playgroud)

version: 2

sources:

  - name: s3_mixpanel
    database: RAW_DEV
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
              data_type: date
            - name: file_name
              expression: metadata$filename
              data_type: string
          columns:
            - name: properties
              data_type: variant
Run Code Online (Sandbox Code Playgroud)

我收到一个错误:

运行操作时遇到错误:宏 stage_external_sources (macros/stage_external_sources.sql) 中的编译错误
'dict object' 中的没有属性 'sources'

Jer*_*hen 6

作为该dbt-external-tables包的维护者,我将分享其固执己见的观点。该包装认为你应该阶段所有外部源(S3文件)作为外部表或与snowpipes第一,在包括尽可能少的混淆逻辑尽可能的处理。然后,您可以在 dbt 模型中选择它们作为源,以及所有必需的业务逻辑。

如果我的理解是正确的,您将按如下方式在一个名为(例如)models/staging/mixpanel/src_mixpanel.yml的文件中暂存您的 mixpanel 数据:

version: 2

sources:

  - name: s3_mixpanel
    database: raw_dev
    tables:
      - name: events
        external:
          location: '@my_s3_stage'
          file_format: "( type = json )"  # or a named file format
          auto_refresh: false # depends on your S3 setup
          partitions:
            - name: event_date
              expression: to_date(SUBSTR(metadata$filename,16,10),'yyyy/mm/dd')
          columns:
            - name: properties
              data_type: variant
Run Code Online (Sandbox Code Playgroud)

您可以从包中运行此宏来创建外部表,并在创建后更新其分区元数据(如果您尚未auto_refresh启用)(请参阅 Snowflake文档):

dbt run-operation stage_external_sources
Run Code Online (Sandbox Code Playgroud)

然后,您可以在增量模型中从此源中进行选择,就像上面的模型一样。现在,event_date是这个外部表上的一个分区列,因此对其进行过滤应该使 Snowflake 能够修剪文件(尽管对于动态的、子查询派生的过滤器,这在历史上是不一致的)。

{{
  config(
    materialized ='incremental'
  )
}}
 
  SELECT
    metadata$filename as file_name,
    event_date,
    value as payload,
    properties:mp_processing_time_ms::int / 1000 as event_timestamp_converted,
    CONVERT_TIMEZONE('Europe/London', current_timestamp) as modeled_at

 from {{ source('s3_mixpanel', 'events' }} 

    
{% if is_incremental() %}
    -- this filter will only be applied on an incremental run
    WHERE event_date >(
    SELECT
        max(event_date)
    FROM
        {{ this }}
    )
{% endif %}

{{ row_limit() }}
Run Code Online (Sandbox Code Playgroud)