如何在不丢失或复制任何记录的情况下移动或更改管道

Dav*_*son 5 snowflake-cloud-data-platform snowflake-pipe

这个管理管道的页面建议了一个将管道中的 copy into 语句更改为语句的过程。

  1. 暂停管道(使用 ALTER PIPE ... SET PIPE_EXECUTION_PAUSED=true)。
  2. 查询 SYSTEM$PIPE_STATUS 函数并验证管道执行状态是否为 PAUSED 且挂起的文件计数为 0。
  3. 重新创建管道以更改定义中的 COPY 语句。选择以下任一选项: 放下管道(使用 DROP PIPE)并创建它(使用 CREATE PIPE)。重新创建管道(使用 CREATE OR REPLACE PIPE 语法)。在内部,管道被删除和创建。
  4. 再次查询SYSTEM$PIPE_STATUS 函数并验证管道执行状态是否为RUNNING。

但是,如果应该在暂停和重新创建管道之间的时间内加载文件,则此处没有刷新该间隙的步骤。即使这些步骤很快发生,我们也有丢失文件的例子。

运行ALTER PIPE REFRESH虽然会导致重复,因为复制历史记录与管道相关联。重新创建的管道没有此历史记录,将返回并重新加载所有内容。

有没有一种很好的方法来编写这样的更改以确保没有间隙或重叠?类似于获取原始管道暂停时的时间戳,然后在刷新查询中使用该时间戳?

更新:我们构建了一个完整的流程和脚本组合来处理我们的场景。完整脚本包含在下面的答案中。

Gjo*_*gji 5

不幸的是,这是自动摄取 Snowpipe 的当前限制,并且没有很好的解决方法。

\n\n

关于历史与特定管道的联系,你是绝对正确的。重新创建管道时,从技术上讲,您拥有一个具有新历史记录的新管道。因此,ALTER PIPE REFRESH 将再次重新加载指定前缀下的所有内容,从而导致重复数据。

\n\n

您可以使用 ALTER PIPE REFRESH 的 \xe2\x80\x9cMODIFIED_DATE\xe2\x80\x9d 选项并指定等于最后一个 pipeline_recieve_time 的时间,但您可能会错过一些通知,因为不保证传入订单。

\n\n

为了最大限度地减少重复数据量,您可以以更精细的日期时间结构(例如年/月/日/小时/分钟/...)组织数据文件,并仅刷新与管道所在时间相对应的文件夹\xe2\x80\x9cdown\xe2\x80\x9d。但是,可能有一些文件已由前一个管道加载,从而导致一些重复的数据。

\n


Dav*_*son 3

我们编写了这个用于管理雪管的流程/脚本。\n它包括验证和刷新在更改或移动管道时删除的文件。

\n\n

它在某种程度上特定于我们的流程和标准,但应该根据用例轻松修改/概括。

\n\n
/* =====================================================================\nThis script provides tools for managing, altering, migrating snowpipes\n    and their underlying tables. These instructions may need to be tweaked\n    depending on your exact use case.\n\nAssuming the pipes and stages follow our standard naming conventions,\n    you can find and replace <Database_Name>, <Schema_Name>, <Table_Name>\n    with their respective values\n\n======================================================================== */\n\n------------------------------------------\n-- Set up Context and Variables\n------------------------------------------\n--Set your context so you don\xe2\x80\x99t accidently run scripts in the wrong place\nuse <Database_Name>.<Schema_Name>\n\n--Pause the pipe, also giving us the pipe altered time\n--Prefer to avoid running this at the top of the hour to make it easier to verify file loads below.\nalter pipe <Table_Name>_PIPE set pipe_execution_paused = true;\n\n--Get timestamp for when pipe was paused and set as variable $pipe_altered_time\nset pipe_altered_time = (select last_altered::timestamp_tz from information_schema.pipes where pipe_name = \'<Table_Name>_PIPE\');\n\n--View variable from above to verify that it was altered just now\nselect $pipe_altered_time;\n\n------------------------------------------\n-- Alter table as per request\n------------------------------------------\n--This step will change depending on the resquest. E.g. it can be skipped if you are just moving a pipe\n\n    --Example for adding columns to a pipe\n    alter table <Table_Name> add column <CREATE_DATETIME_UTC> <timestamp_NTZ>;\n    alter table <Table_Name> add column <INCOMING_RAW_REQUEST_ID> <STRING>;\n    alter table <Table_Name> add column <INITIAL_PING_ID> <INT>;\n    --Viewing if new column was added\n    --Note: Make sure the order of all of the columns matches the pipe column order below\n    select top 10 * from <Table_Name>;\n\n    --Example for migrating a pipe\n    create table <New_Table_Name> clone <Old_Database_Name>.<Old_Schema_Name>.<Old_Table_Name>\n\n------------------------------------------\n-- Alter Pipe\n------------------------------------------\n--Altered pipe statement and turns the pipe on\n--This DDL should be retrieved using `SELECT GET_DDL(\'pipe\', \'<Table_Name>_PIPE\')` (in the same schema context) and then modified to add/adjust columns if needed.\n--Make sure that extracted column names here come from the JSON attribute names, which should be provided in the request.\n--Most Pipes should follow a very similar pattern to the example below\ncreate or replace pipe <Table_Name>_PIPE\n    auto_ingest=true \n    integration=\'EVENT_HUB_QUEUE\' \n    as \n<\n   COPY INTO <Table_Name>\n    FROM (SELECT PARSE_JSON(HEX_DECODE_STRING($1:Body)) as JSON, 1 as IS_ACTIVE, metadata$filename, JSON:CreatedDateUtc, JSON:Id, JSON:InitialPingId FROM @<Table_Name>_STAGE)\n>;\n\n--"Refresh" pipe. Reprocess files that were loaded during the paused time frame. \n-- This can take a few minutes depending on the table and how long it was paused.\nalter pipe <Table_Name>_PIPE REFRESH MODIFIED_AFTER = $pipe_altered_time;\n\n-------------------------------------------------------------------------------------------------------\n--VALIDATION\n-------------------------------------------------------------------------------------------------------\n\n------------------------------------------\n-- Verify that new files are being processed as expected\n------------------------------------------\n\n--Data validation - see what files have been loaded\n--It can take a while for new files to show up depending on the event_hub.\n--The real thing you want to check is that the pipe is working, and you see files from after the time when the pipe was paused\n--If there are new files coming in from Event_hub, but not making it to the table, the pipe may have errors that will help for troubleshooting\nselect LAST_LOAD_TIME, ROW_COUNT, FILE_NAME\n  from table(information_schema.copy_history(Table_Name=>\'<Table_Name>\', start_time=> dateadd(hours, -2, current_timestamp())))\n  ORDER BY LAST_LOAD_TIME DESC;\n\n--Validate data made it to the new columns\n--Make sure that there are new records since the $pipe_altered_time in the table (it might take a while for them to show up)\n--  Validate that the new records look as expected\n--  If there is confusion here, it would be appropriate to contact the requestor on if the data looks correct\nselect * from <Table_Name>\nwhere <any new column name> is not null;\n\n------------------------------------------\n--compare files in the storage account vs files loaded into the table\n------------------------------------------\n\n--Choose an hour of the day for comparing (Generally the hour during which the pipe was paused ($pipe_altered_time))\n--see what files have been loaded recently if needed\nselect distinct top 1000 file_name from <Table_Name>\norder by file_name desc;\n\n--run these to create variables\n--Remove the \'hh\' from the directory format below to query an entire day.\nset hour_to_check = DATE_TRUNC(\'Hour\', $pipe_altered_time)::timestamp_ntz;\nset date_directory = to_varchar($hour_to_check, \'/YYYY/MM/DD/hh\');\nset file_pattern = \'\'\'.*\' || $date_directory || \'.*\'\'\';\nselect $file_pattern;\n\n-- Get the list of files in the storage account. \n-- This query can take 10+ minutes for storage accounts with many files\n-- Note: This is currently not dynamic, and doesn\'t like variables. You need to manually enter the value from $file_pattern variable above\nLIST @<Table_Name>_STAGE pattern= <\'.*/2020/04/23/17.*\'>;\n\n-- Comparison stuff\n-- Uses the output of the previous query and compares it to what\'s been loaded in the table\nSELECT s.$1 as azurefilename, t.file_name as tablefilename\nFROM\n(\n    table(RESULT_SCAN(LAST_QUERY_ID()))\n    --If you need to specify the query id for the LIST results manually, use this instead\n    --table(RESULT_SCAN(\'<abcabcab-00bd-74d1-0000-1d990c3c72aa>\'))\n) s\nFULL OUTER JOIN \n(\n    select distinct file_name from OUTGOING_RAW_REQUEST\n    where file_name like \'%\' || $date_directory || \'%\'\n) t\nON charindex(t.file_name, s.$1) > 0;\n\n-- Check for mismatched files\n-- Ideally, this list should be empty\nselect *\n    from \n    (\n        table(RESULT_SCAN(LAST_QUERY_ID()))\n        --If you need to specify the query id for the comparison results manually, use this instead\n        --table(RESULT_SCAN(\'<abcabcab-00bd-74d1-0000-1d990c3c72aa>\'))\n    )\n    where $1 is null or $2 is null;\n\n-- If the above list is empty. You\'re done.\n-- If the above list returns files that were not loaded, then copy the missing files into table.\n-- Get the copy into statement. From the pipe creation statement above or using `select get_ddl(\'pipe\', \'<pipe_name>\')`\n--      You\'ll need to cut out the pipe syntax and just use the `COPY INTO` statement\n-- the file name format should generally look like `/2020/04/23/17/20/53/28.avro` and you may need to trim the results from above to match.\n<COPY INTO OUTGOING_RAW_REQUEST FROM (SELECT PARSE_JSON(HEX_DECODE_STRING($1:Body)), \'1\'::numeric, metadata$filename FROM @OUTGOING_RAW_REQUEST_STAGE)>\nfiles = (\n\'</2020/04/23/17/20/53/28.avro>\',\n\'</2020/04/23/17/20/52/20.avro>\');\n
Run Code Online (Sandbox Code Playgroud)\n