标签: kedro

如何按照 kedro 管道中声明的顺序运行节点?

在 Kedro 管道中,节点(类似于 Python 函数)是按顺序声明的。在某些情况下,一个节点的输入是前一节点的输出。然而,有时,当在命令行中调用 kedro run API 时,节点不会按顺序运行。

在 kedro 文档中,它说默认情况下节点按顺序运行。

我的 run.py 代码:

def main(
tags: Iterable[str] = None,
env: str = None,
runner: Type[AbstractRunner] = None,
node_names: Iterable[str] = None,
from_nodes: Iterable[str] = None,
to_nodes: Iterable[str] = None,
from_inputs: Iterable[str] = None,
):

project_context = ProjectContext(Path.cwd(), env=env)
project_context.run(
    tags=tags,
    runner=runner,
    node_names=node_names,
    from_nodes=from_nodes,
    to_nodes=to_nodes,
    from_inputs=from_inputs,
)
Run Code Online (Sandbox Code Playgroud)

目前,我的最后一个节点有时在前几个节点之前运行。

python machine-learning kedro

8
推荐指数
1
解决办法
2761
查看次数

如何有条件地运行 Kedro 管道的一部分?

我有一个很大的管道,需要几个小时才能运行。其中一小部分需要经常运行,如何在不触发整个管道的情况下运行它?

python pipeline kedro

7
推荐指数
1
解决办法
8836
查看次数

Kedro 部署到 databricks

也许我误解了打包的目的,但它似乎对创建用于生产部署的工件没有帮助,因为它只打包代码。它省略了 conf、data 和其他使 kedro 项目可重现的目录。

我知道我可以使用 docker 或气流插件进行部署,但是部署到 databricks 又如何呢?您有什么建议吗?

我正在考虑制作一个可以安装在集群上的轮子,但我需要先打包conf。另一种选择是将 git 工作区同步到集群并通过笔记本运行 kedro。

关于最佳实践有什么想法吗?

kedro

5
推荐指数
1
解决办法
2765
查看次数

在 Kedro Notebook 中设置参数

是否可以覆盖从 Kedro 笔记本中的parameters.yaml 文件中获取的属性?

我正在尝试动态更改笔记本中的参数值。我希望能够让用户能够运行标准管道,但具有可定制的参数。我不想更改 YAML 文件,我只想更改笔记本生命周期的参数。

我尝试在上下文中编辑参数,但这没有影响。

context.params.update({"test_param": 2})
Run Code Online (Sandbox Code Playgroud)

我是否遗漏了什么或者这不是预期的用例?

python kedro

5
推荐指数
1
解决办法
1020
查看次数

如何在kedro中处理庞大的数据集

我有相当大的(~200Gb,~20M 行)原始 jsonl 数据集。我需要从那里提取重要的属性并将中间数据集存储在 csv 中以进一步转换为 HDF5、parquet 等。显然,我不能JSONDataSet用于加载原始数据集,因为它pandas.read_json在幕后使用,并使用 pandas如此规模的数据集听起来是个坏主意。所以我正在考虑逐行读取原始数据集,逐行处理并将处理后的数据附加到中间数据集。

我无法理解的是如何使其AbstractDataSet与它的_load_save方法兼容。

PS 我知道我可以将其移出 kedro 的上下文,并将预处理数据集作为原始数据集引入,但这有点破坏了完整管道的整个想法。

python kedro

5
推荐指数
1
解决办法
443
查看次数

DataBricks + Kedro Vs GCP + Kubeflow Vs 服务器 + Kedro + Airflow

我们正在 10 多家公司之间部署一个数据联盟。Wi 将为所有公司部署多个机器学习模型(一般为高级分析模型),我们将管理所有模型。我们正在寻找一种管理多个服务器、集群和数据科学管道的解决方案。我喜欢 kedro,但不确定在使用 kedro 时管理所有内容的最佳选择是什么。

总之,我们正在寻找最佳解决方案来管理不同服务器和可能的 Spark 集群中的多个模型、任务和管道。我们目前的选择是:

  • AWS 作为我们的数据仓库和 Databricks 用于管理服务器、集群和任务。我不认为 databricks 的 notebooks 是构建管道和协作工作的好解决方案,所以我想将 kedro 连接到 databricks(它好吗?使用 databricks 安排 kedro 管道的运行容易吗? )

  • 将 GCP 用于数据仓库并使用 kubeflow (iin GCP) 来部署模型以及管道和所需资源的管理和调度

  • 从 ASW 或 GCP 设置服务器,安装 kedro 并使用气流安排管道(我发现管理 20 个服务器和 40 个管道是一个大问题)

我想知道是否有人知道这些替代方案之间的最佳选择、它们的缺点和优点,或者是否有更多的可能性。

google-cloud-platform databricks kedro

5
推荐指数
1
解决办法
510
查看次数

如何在 kedro 0.17.0 中加载特定的目录数据集实例?

我们使用的是 kedro 版本 0.15.8,并且我们以这种方式从目录中加载一个特定项目:

from kedro.context import load_context
get_context().catalog.datasets.__dict__[key]
Run Code Online (Sandbox Code Playgroud)

现在,我们正在更改为 kedro 0.17.0 并尝试以相同的方式加载目录数据集(使用框架上下文):

from kedro.framework.context import load_context
get_context().catalog.datasets.__dict__[key]
Run Code Online (Sandbox Code Playgroud)

现在我们得到了错误:

kedro.framework.context.context.KedroContextError:需要一个实例ConfigLoaderNoneType却得到了。

这是因为项目中的 hook register_config_loader 没有被调用该函数的 hook_manager 使用。

项目挂钩定义如下:

class ProjectHooks:

    @hook_impl

    def register_pipelines(self) -> Dict[str, Pipeline]:

        """Register the project's pipeline.

        Returns:

            A mapping from a pipeline name to a ``Pipeline`` object.

        """

        pm = pre_master.create_pipeline()

        return {

            "pre_master": pm,

            "__default__": pm

        }

    @hook_impl

    def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:

        return ConfigLoader(conf_paths)

    @hook_impl

    def register_catalog(

        self,

        catalog: Optional[Dict[str, …
Run Code Online (Sandbox Code Playgroud)

hook kedro

5
推荐指数
1
解决办法
3481
查看次数

在mlflow.yml中指定主机和端口并运行“kedro mlflow ui”,但主机和端口仍然默认(localhost:5000)不改变

我构建示例 kedro 项目参考此页面,并在 mlflow.yml 中指定主机作为我的全局 IP 地址。但是当我点击“kedro mlflow ui”命令时,它仍然监听本地。即使我只在 mlflow.yml 中指定端口为 5001 (不是默认),它也不起作用。谁能帮我。

python版本:3.6.8(anaconda)kedro版本:0.17.0 kedro mlflow版本:0.6.0

mlflow kedro

4
推荐指数
1
解决办法
5506
查看次数

管道在kedro中找不到节点

我正在关注管道教程,创建所有需要的文件,使用 kedro 启动kedro run --node=preprocessing_data但卡住了这样的错误消息:

ValueError: Pipeline does not contain nodes named ['preprocessing_data'].
Run Code Online (Sandbox Code Playgroud)

如果我不带node参数运行kedro ,我会收到

kedro.context.context.KedroContextError: Pipeline contains no nodes
Run Code Online (Sandbox Code Playgroud)

文件内容:

src/project/pipelines/data_engineering/nodes.py
def preprocess_data(data: SparkDataSet) -> None:
    print(data)
    return
Run Code Online (Sandbox Code Playgroud)
src/project/pipelines/data_engineering/pipeline.py
def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                func=preprocess_data,
                inputs="data",
                outputs="preprocessed_data",
                name="preprocessing_data",
            ),
        ]
    )
Run Code Online (Sandbox Code Playgroud)
src/project/pipeline.py
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
    de_pipeline = de.create_pipeline()
    return {
        "de": de_pipeline,
        "__default__": Pipeline([])
    }
Run Code Online (Sandbox Code Playgroud)

python kedro

3
推荐指数
2
解决办法
1152
查看次数

如何将多个 CSV 文件添加到 Kedro 的目录中?

我有数百个 CSV 文件要类似地处理。为简单起见,我们可以假设它们都在./data/01_raw/(如./data/01_raw/1.csv./data/02_raw/2.csv)等中。我宁愿不给每个文件一个不同的名称,并在构建我的管道时单独跟踪它们。我想知道是否有任何方法可以通过在catalog.yml文件中指定某些内容来批量读取所有文件?

python kedro

2
推荐指数
1
解决办法
344
查看次数

ipython 会话中缺少 kedro 上下文和目录

我启动了 ipython 会话并尝试加载数据集。
我正在运行
df = Catalog.load("test_dataset")
面临以下错误
NameError: name 'catalog' is not defined

我也尝试过 %reload_kedro 但出现以下错误

UsageError: Line magic function `%reload_kedro` not found.

甚至也无法加载上下文。我正在从 Docker 容器运行 kedro 环境。我不确定我哪里出错了。

ipython kedro

2
推荐指数
1
解决办法
1421
查看次数

Kedro 安装 - 无法卸载`terminado`

运行时kedro install出现以下错误:

Attempting uninstall: terminado
    Found existing installation: terminado 0.8.3
ERROR: Cannot uninstall 'terminado'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
Run Code Online (Sandbox Code Playgroud)

此 github问题建议进行以下修复:

pip install terminado --user --ignore-installed
Run Code Online (Sandbox Code Playgroud)

但它对我不起作用,因为我一直有同样的错误。

注意: 这个问题与this相似,但足够不同,我认为值得单独提出。

python pip kedro

1
推荐指数
1
解决办法
1114
查看次数

使用 DataCatalog 保存数据

我在看iriskedro 提供的项目示例。除了记录准确性之外,我还想将predictions和保存test_y为 csv。

这是 kedro 提供的示例节点。

def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:
    """Node for reporting the accuracy of the predictions performed by the
    previous node. Notice that this function has no outputs, except logging.
    """
    # Get true class index
    target = np.argmax(test_y.to_numpy(), axis=1)
    # Calculate accuracy of predictions
    accuracy = np.sum(predictions == target) / target.shape[0]
    # Log the accuracy of the model
    log = logging.getLogger(__name__)
    log.info("Model accuracy on test set: %0.2f%%", accuracy …
Run Code Online (Sandbox Code Playgroud)

python kedro

0
推荐指数
1
解决办法
35
查看次数