在pentaho数据集成中新插入或更新的行数

Sre*_*ith 5 etl pentaho pdi

我是Pentaho数据集成的新手; 我需要将一个数据库集成到另一个位置作为ETL Job.我想在ETL作业期间计算插入/更新的数量,并将该计数插入另一个表.谁可以帮我这个事?

Yuv*_*ger 5

迄今为止,我认为没有内置功能可以返回PDI 中插入/更新步骤的受影响行数。

尽管如此,大多数数据库供应商都能够为您提供从给定操作中获取受影响行数的功能。

例如,在 PostgreSQL 中,它看起来像这样:

/* Count affected rows from INSERT */
WITH inserted_rows AS (
    INSERT INTO ...
    VALUES
        ...
    RETURNING 1
)
SELECT count(*) FROM inserted_rows;

/* Count affected rows from UPDATE */
WITH updated_rows AS (
    UPDATE ...
    SET ...
    WHERE ...
    RETURNING 1
)
SELECT count(*) FROM updated_rows;
Run Code Online (Sandbox Code Playgroud)

但是,您的目标是在 PDI 作业中执行此操作,因此我建议您尝试达到控制 SQL 脚本的程度。

建议:将源数据保存在目标数据库服务器上的文件中,然后使用它(可能具有批量加载功能)来插入/更新,然后将受影响的行数保存到 PDI 变量中。请注意,您可能需要在Job范围内使用 SQL 脚本步骤。

编辑:实现是选择设计的问题,因此建议的解决方案是众多解决方案之一。在非常高的层面上,您可以执行如下操作。

  • 转换 I - 从源中提取数据
    • 从源获取数据,无论是数据库还是其他任何东西
    • 以适合目标数据库结构的方式准备输出
    • 使用文件系统上的文本文件输出步骤保存 CSV 文件
  • 家长工作
    • 如果 PDI 服务器与目标 DB 服务器相同:
      • 使用执行 SQL 脚本步骤来:
        • 从文件中读取数据并执行 INSERT/UPDATE
        • 将受影响的行数写入表中(理想情况下,该表还可以包含操作的时间戳,以便您可以跟踪事情)
    • 如果 PDI 服务器与目标 DB 服务器不同:
      • 将源数据文件上传到服务器,例如使用 FTP/SFTP 文件上传步骤
      • 使用执行 SQL 脚本步骤来:
        • 从文件中读取数据并执行 INSERT/UPDATE
        • 将受影响的行数写入表中

编辑2:另一个建议的解决方案

正如 @user3123116 所建议的,您可以使用“比较字段”步骤(如果不是您环境的一部分,请检查市场)。

我看到的唯一缺点是您必须在插入/更新之前查询目标数据库,这当然性能较差。

最终它可能看起来像这样(请注意,这只是比较和计数部分): 现场比较

另请注意,您可以拆分源数据流的输入(COPY,而不是DISTRIBUTE),并进行插入/更新,但是此流必须等待字段比较的流结束对目标数据库的查询,否则您可能最终得到错误的统计数据。