标签: etl

Apache Nifi 与 Gobblin

我正在评估一个大数据项目,我们需要从各种互联网来源(ftp、api 等)提取大量大数据集,进行轻量转换和轻量数据质量/健全性检查(例如:行和列检查),并将其推向下游。目前的焦点是批量的,但预计会支持流式传输。易于大规模支持是一项重要要求。

我们正在研究 Apache Nifi 和 Gobblin,它们的意图似乎重叠。哪种用例最适合哪个平台?它们如何符合上述用例?

谢谢!

etl bigdata apache-nifi gobblin

3
推荐指数
1
解决办法
2245
查看次数

如何使用气流来编排简单的 pandas etl python 脚本?

我喜欢气流的想法,但我停留在基础知识上。从昨天开始,我在 vm ubuntu-postgres 解决方案上运行了气流。我可以看到仪表板和示例数据:))我现在想要的是将用于处理原始数据的示例脚本迁移到准备好的数据。

假设你有一个 csv 文件的文件夹。今天我的脚本迭代它,将每个文件传递到一个将被转换为 df.txt 的列表。之后,我准备它们的列名称并进行一些数据清理并将其写入不同的格式。

1:目录中文件的pd.read_csv

2:创建一个df

3:干净的列名

4:干净的值(与 stp 3 并行)

5:将结果写入数据库

我该如何根据气流组织我的文件?脚本应该是什么样子?我是否传递单个方法、单个文件,还是必须为每个部分创建多个文件?我现在缺乏基本概念:(我读到的关于气流的所有内容都比我的简单情况复杂得多。我正在考虑放弃气流以及 Bonobo、Mara、Luigi,但我认为气流是值得的?!

python etl pandas airflow airflow-scheduler

3
推荐指数
2
解决办法
1万
查看次数

SSIS - 任务分组和序列任务之间有什么区别?

任务分组序列任务都允许将任务组合在一起作为一个单元。

它们之间有什么区别?

任务分组- 请参阅4- Sql Server 2012 实施数据仓库 - 考试 70-463 - 在 4:28 查看视频中的最后一行

sql-server ssis etl ssis-2012

3
推荐指数
1
解决办法
1679
查看次数

SSIS 执行 SQL 任务错误“指定了单行结果集,但未返回任何行。”

我正在尝试我认为相对容易的事情。我使用 SQL 任务在表中查找文件名。如果存在,则执行某些操作,如果不存在,则不执行任何操作。

这是我在 SSIS 中的设置:

SSIS设置

我的“表中存在文件”中的 SQL 语句如下,结果集为“单行”:

SELECT ISNULL(id,0) as id FROM PORG_Files WHERE filename = ?
Run Code Online (Sandbox Code Playgroud)

在此输入图像描述

在此输入图像描述

我的约束是:

在此输入图像描述

当我运行它时,表中还没有文件,因此它应该不返回任何内容。我已经尝试ISNULLCOALESCE设置一个值。我收到以下错误:

错误:表中存在文件,执行 SQL 任务时出现错误:0xC002F309:为变量“id”赋值时发生错误:“指定了单行结果集,但未返回任何行。”。

不知道如何解决这个问题。和ISNULLCOALESCE在 SO 和 MSDN 上找到的建议

sql-server ssis etl

3
推荐指数
1
解决办法
9171
查看次数

收到“外部表的格式不符合预期”。尝试在 SSIS 中导入 Excel 文件时出错

我正在尝试.xls通过 SSIS 将 Excel 文件 ( ) 导入到 SQL Server 中的表中。但 SSIS 似乎无法将该文件识别为有效的 Excel 文件。我收到以下错误:

错误1:

[Excel 源 [86]] 错误:SSIS
错误代码 DTS_E_CANNOTACQUIRECONNECTIONFROMCONNECTIONMANAGER。
对连接管理器“Carga Base Original”的 AcquireConnection 方法调用失败,错误代码为 0xC0202009。在此之前可能会发布错误消息,其中包含有关 AcquireConnection 方法调用失败原因的更多信息。

错误2:

[SSIS.Pipeline]
错误:Excel 源验证失败并返回错误代码 0xC020801C。

错误3:

[连接管理器“Carga Base Original”] 错误:SSIS 错误代码 DTS_E_OLEDBERROR。发生 OLE DB 错误。错误代码:0x80004005。OLE DB 记录可用。
来源:“Microsoft Access 数据库引擎”
Hresult:0x80004005
描述:“外部表不是预期的格式”。

我的连接管理器属性是 Excel 源,其属性如下所示:

连接管理器属性

我通过变量传递 Excel 文件路径。Excel 文件看起来正常并且没有损坏。我尝试放置修复 Excel 路径,尝试将其放入连接字符串属性(修复路径和变量),但这没有任何帮助。

谁能帮帮我吗?

sql-server oledb excel ssis etl

3
推荐指数
1
解决办法
1万
查看次数

导入/导出 DataFusion 管道

有谁知道是否可以以编程方式导入/导出 DataFlow 管道(已部署或处于草稿状态)?

我们的想法是编写一个脚本来删除并创建一个 DataFusion 实例,以避免在不使用时计费。通过 gloud 命令行,可以配置 DataFusion 集群并销毁它,但自动导出和导入我的所有管道也会很有趣。

不幸的是,官方文档并没有帮助我......

谢谢!

integration etl google-cloud-platform google-cloud-data-fusion

3
推荐指数
1
解决办法
2774
查看次数

SQLAlchemy 中 mssql+pyodbc 出现“数据源名称太长”错误

我正在尝试使用 SQLAlchemy 和 pyobdc 将数据帧上传到 Azure SQL Server 数据库上的数据库。我已建立连接,但上传时出现错误:

(pyodbc.Error) ('IM010', '[IM010] [Microsoft][ODBC 驱动程序管理器] 数据源名称太长 (0) (SQLDriverConnect)')

我不确定这个错误是从哪里来的,因为我之前使用过 sqlalchemy 没有问题。我在下面附上了我的代码,有人可以帮我诊断问题吗?

username = 'bcadmin'
password = 'N@ncyR2D2'
endpoint = 'bio-powerbi-bigdata.database.windows.net'

engine = sqlalchemy.create_engine(f'mssql+pyodbc://{username}:{password}@{endpoint}')

df.to_sql("result_management_report",engine,if_exists='append',index=False)
Run Code Online (Sandbox Code Playgroud)

我知道其他 ETL 方法,如数据工厂和 SSMS,但我更喜欢使用 pandas 作为 ETL 过程。

请帮我解决这个错误。

etl sqlalchemy pyodbc pandas azure-sql-database

3
推荐指数
1
解决办法
7257
查看次数

Pentaho ETL 工具即将消亡吗?

我在寻找有用的 pentaho ETL 工具信息时遇到问题,这个工具快要死了吗?有哪些替代工具/平台>

etl pentaho

3
推荐指数
1
解决办法
1526
查看次数

在 BranchPython Operator 之后,Airflow 2.0 任务被跳过

我正在新版本中的 Airflow 中摆弄分支,无论我尝试什么,BranchOperator 之后的所有任务都会被跳过。

这是我一直在努力完成的一个最小的例子

from airflow.decorators import dag, task
from datetime import timedelta, datetime

from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

import logging
logger = logging.getLogger("airflow.task")

@dag(
    schedule_interval="0 0 * * *",
    start_date=datetime.today() - timedelta(days=2),
    dagrun_timeout=timedelta(minutes=60),
)
def StackOverflowExample():

    @task
    def task_A():

        logging.info("TASK A")
        

    @task
    def task_B():

        logging.info("TASK B")

    @task
    def task_C():

        logging.info("TASK C")

    @task
    def task_D():
        
        logging.info("TASK D")

        return {"parameter":0.5}

    
    def _choose_task(task_parameters,**kwargs):

        logging.info(task_parameters["parameter"])
        if task_parameters["parameter"]<0.5:
            logging.info("SUCCESSS ")
            return ['branch_1', 'task_final']
        else:
            logging.info("RIP")
            return ['branch_2', 'task_final']

    @task(task_id="branch_1")
    def …
Run Code Online (Sandbox Code Playgroud)

python branch etl directed-acyclic-graphs airflow

3
推荐指数
1
解决办法
2826
查看次数

SQS 到 ECS (Fargate) 或 SQS 到 Lambda 到 ECS

这个问题更多的是一个关于如何最好地构建 ETL 管道的架构问题。目前,我有一个通过 SQS ping 的 AWS Lambda。但处理数据可能需要 15 分钟多一点(AWS 的运行时间限制),并且使用 forsam build部署会导致 .zip 大于 250MB,因此会引发错误。因此需要 AWS Lambda 的替代方案。到目前为止我看到的替代方案是:

SQS -> ECS (Fargate) SQS -> Lambda -> ECS (Fargate)

我没有找到任何关于这两个选项的优缺点的提示,以及通常首选的选项。关于如何解决这个问题有什么建议吗?

etl amazon-sqs amazon-ecs aws-lambda

3
推荐指数
1
解决办法
4477
查看次数