在 Kedro 管道中,节点(类似于 Python 函数)是按顺序声明的。在某些情况下,一个节点的输入是前一节点的输出。然而,有时,当在命令行中调用 kedro run API 时,节点不会按顺序运行。
在 kedro 文档中,它说默认情况下节点按顺序运行。
我的 run.py 代码:
def main(
tags: Iterable[str] = None,
env: str = None,
runner: Type[AbstractRunner] = None,
node_names: Iterable[str] = None,
from_nodes: Iterable[str] = None,
to_nodes: Iterable[str] = None,
from_inputs: Iterable[str] = None,
):
project_context = ProjectContext(Path.cwd(), env=env)
project_context.run(
tags=tags,
runner=runner,
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
from_inputs=from_inputs,
)
Run Code Online (Sandbox Code Playgroud)
目前,我的最后一个节点有时在前几个节点之前运行。
我有一个很大的管道,需要几个小时才能运行。其中一小部分需要经常运行,如何在不触发整个管道的情况下运行它?
也许我误解了打包的目的,但它似乎对创建用于生产部署的工件没有帮助,因为它只打包代码。它省略了 conf、data 和其他使 kedro 项目可重现的目录。
我知道我可以使用 docker 或气流插件进行部署,但是部署到 databricks 又如何呢?您有什么建议吗?
我正在考虑制作一个可以安装在集群上的轮子,但我需要先打包conf。另一种选择是将 git 工作区同步到集群并通过笔记本运行 kedro。
关于最佳实践有什么想法吗?
是否可以覆盖从 Kedro 笔记本中的parameters.yaml 文件中获取的属性?
我正在尝试动态更改笔记本中的参数值。我希望能够让用户能够运行标准管道,但具有可定制的参数。我不想更改 YAML 文件,我只想更改笔记本生命周期的参数。
我尝试在上下文中编辑参数,但这没有影响。
context.params.update({"test_param": 2})
Run Code Online (Sandbox Code Playgroud)
我是否遗漏了什么或者这不是预期的用例?
我有相当大的(~200Gb,~20M 行)原始 jsonl 数据集。我需要从那里提取重要的属性并将中间数据集存储在 csv 中以进一步转换为 HDF5、parquet 等。显然,我不能JSONDataSet
用于加载原始数据集,因为它pandas.read_json
在幕后使用,并使用 pandas如此规模的数据集听起来是个坏主意。所以我正在考虑逐行读取原始数据集,逐行处理并将处理后的数据附加到中间数据集。
我无法理解的是如何使其AbstractDataSet
与它的_load
和_save
方法兼容。
PS 我知道我可以将其移出 kedro 的上下文,并将预处理数据集作为原始数据集引入,但这有点破坏了完整管道的整个想法。
我们正在 10 多家公司之间部署一个数据联盟。Wi 将为所有公司部署多个机器学习模型(一般为高级分析模型),我们将管理所有模型。我们正在寻找一种管理多个服务器、集群和数据科学管道的解决方案。我喜欢 kedro,但不确定在使用 kedro 时管理所有内容的最佳选择是什么。
总之,我们正在寻找最佳解决方案来管理不同服务器和可能的 Spark 集群中的多个模型、任务和管道。我们目前的选择是:
AWS 作为我们的数据仓库和 Databricks 用于管理服务器、集群和任务。我不认为 databricks 的 notebooks 是构建管道和协作工作的好解决方案,所以我想将 kedro 连接到 databricks(它好吗?使用 databricks 安排 kedro 管道的运行容易吗? )
将 GCP 用于数据仓库并使用 kubeflow (iin GCP) 来部署模型以及管道和所需资源的管理和调度
从 ASW 或 GCP 设置服务器,安装 kedro 并使用气流安排管道(我发现管理 20 个服务器和 40 个管道是一个大问题)
我想知道是否有人知道这些替代方案之间的最佳选择、它们的缺点和优点,或者是否有更多的可能性。
我们使用的是 kedro 版本 0.15.8,并且我们以这种方式从目录中加载一个特定项目:
from kedro.context import load_context
get_context().catalog.datasets.__dict__[key]
Run Code Online (Sandbox Code Playgroud)
现在,我们正在更改为 kedro 0.17.0 并尝试以相同的方式加载目录数据集(使用框架上下文):
from kedro.framework.context import load_context
get_context().catalog.datasets.__dict__[key]
Run Code Online (Sandbox Code Playgroud)
现在我们得到了错误:
kedro.framework.context.context.KedroContextError:需要一个实例
ConfigLoader
,NoneType
却得到了。
这是因为项目中的 hook register_config_loader 没有被调用该函数的 hook_manager 使用。
项目挂钩定义如下:
class ProjectHooks:
@hook_impl
def register_pipelines(self) -> Dict[str, Pipeline]:
"""Register the project's pipeline.
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
pm = pre_master.create_pipeline()
return {
"pre_master": pm,
"__default__": pm
}
@hook_impl
def register_config_loader(self, conf_paths: Iterable[str]) -> ConfigLoader:
return ConfigLoader(conf_paths)
@hook_impl
def register_catalog(
self,
catalog: Optional[Dict[str, …
Run Code Online (Sandbox Code Playgroud) 我构建示例 kedro 项目参考此页面,并在 mlflow.yml 中指定主机作为我的全局 IP 地址。但是当我点击“kedro mlflow ui”命令时,它仍然监听本地。即使我只在 mlflow.yml 中指定端口为 5001 (不是默认),它也不起作用。谁能帮我。
python版本:3.6.8(anaconda)kedro版本:0.17.0 kedro mlflow版本:0.6.0
我正在关注管道教程,创建所有需要的文件,使用 kedro 启动kedro run --node=preprocessing_data
但卡住了这样的错误消息:
ValueError: Pipeline does not contain nodes named ['preprocessing_data'].
Run Code Online (Sandbox Code Playgroud)
如果我不带node
参数运行kedro ,我会收到
kedro.context.context.KedroContextError: Pipeline contains no nodes
Run Code Online (Sandbox Code Playgroud)
文件内容:
src/project/pipelines/data_engineering/nodes.py
def preprocess_data(data: SparkDataSet) -> None:
print(data)
return
Run Code Online (Sandbox Code Playgroud)
src/project/pipelines/data_engineering/pipeline.py
def create_pipeline(**kwargs):
return Pipeline(
[
node(
func=preprocess_data,
inputs="data",
outputs="preprocessed_data",
name="preprocessing_data",
),
]
)
Run Code Online (Sandbox Code Playgroud)
src/project/pipeline.py
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
de_pipeline = de.create_pipeline()
return {
"de": de_pipeline,
"__default__": Pipeline([])
}
Run Code Online (Sandbox Code Playgroud) 我有数百个 CSV 文件要类似地处理。为简单起见,我们可以假设它们都在./data/01_raw/
(如./data/01_raw/1.csv
、./data/02_raw/2.csv
)等中。我宁愿不给每个文件一个不同的名称,并在构建我的管道时单独跟踪它们。我想知道是否有任何方法可以通过在catalog.yml
文件中指定某些内容来批量读取所有文件?
我启动了 ipython 会话并尝试加载数据集。
我正在运行
df = Catalog.load("test_dataset")
面临以下错误
NameError: name 'catalog' is not defined
我也尝试过 %reload_kedro 但出现以下错误
UsageError: Line magic function `%reload_kedro` not found.
甚至也无法加载上下文。我正在从 Docker 容器运行 kedro 环境。我不确定我哪里出错了。
运行时kedro install
出现以下错误:
Attempting uninstall: terminado
Found existing installation: terminado 0.8.3
ERROR: Cannot uninstall 'terminado'. It is a distutils installed project and thus we cannot accurately determine which files belong to it which would lead to only a partial uninstall.
Run Code Online (Sandbox Code Playgroud)
此 github问题建议进行以下修复:
pip install terminado --user --ignore-installed
Run Code Online (Sandbox Code Playgroud)
但它对我不起作用,因为我一直有同样的错误。
注意: 这个问题与this相似,但足够不同,我认为值得单独提出。
我在看iris
kedro 提供的项目示例。除了记录准确性之外,我还想将predictions
和保存test_y
为 csv。
这是 kedro 提供的示例节点。
def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:
"""Node for reporting the accuracy of the predictions performed by the
previous node. Notice that this function has no outputs, except logging.
"""
# Get true class index
target = np.argmax(test_y.to_numpy(), axis=1)
# Calculate accuracy of predictions
accuracy = np.sum(predictions == target) / target.shape[0]
# Log the accuracy of the model
log = logging.getLogger(__name__)
log.info("Model accuracy on test set: %0.2f%%", accuracy …
Run Code Online (Sandbox Code Playgroud)