标签: data-pipeline

是否可以编写一个容忍失败的子任务的 luigi 包装器任务?

我有一个执行一些不稳定计算的 luigi 任务。想想有时不收敛的优化过程。

import luigi

MyOptimizer(luigi.Task):
    input_param: luigi.Parameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        optimize_something(self.input_param, self.output().path)

    def output(self):
        return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)

现在我想构建一个包装器任务,该任务将使用不同的输入参数多次运行此优化器,并获取收敛的第一次运行的输出。

我现在实现它的方式是使用,MyOptimizer因为如果它失败,luigi 会认为包装器任务也失败了,但我可以接受一些MyOptimizer失败的实例。

MyWrapper(luigi.Task):
    input_params_list = luigi.ListParameter()
    output_filename = luigi.Parameter(default='result.json')

    def run(self):
        for input_param in self.input_params_list:
            try:
                optimize_something(self.input_param, self.output().path)
                print(f"Optimizer succeeded with input {input_param}")
                break
            except Exception as e:
                print(f"Optimizer failed with input {input_param}. Trying again...")

    def output(self):
        return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)

问题在于,通过这种方式,任务不会并行化。此外,您可以想象MyOptimizeroptimize_something参与由 luigi 处理的数据管道的复杂任务,这在我的代码中造成了相当多的混乱。

我将不胜感激有关如何以类似路易吉的方式进行这项工作的任何见解和想法:)

python error-handling dataflow luigi data-pipeline

13
推荐指数
1
解决办法
265
查看次数

将.npy(numpy文件)送入tensorflow数据管道

Tensorflow似乎缺少".npy"文件的读者.如何将我的数据文件读入新的tensorflow.data.Dataset pipline?我的数据不适合内存.

每个对象都保存在单独的".npy"文件中.每个文件包含2个不同的ndarray作为特征,标量作为标签.

numpy dataset tensorflow data-pipeline

10
推荐指数
3
解决办法
9775
查看次数

实现luigi动态图配置

我是luigi的新手,在为我们的ML工作设计管道时遇到过它.虽然它不适合我的特定用例,但它有很多额外的功能,我决定让它适合.

基本上我正在寻找的是一种能够持久保存自定义构建管道并因此使其结果可重复且易于部署的方法,在阅读了大多数在线教程之后,我尝试使用现有luigi.cfg配置和命令行机制实现我的序列化并且它可能已经足够用于任务的参数但它没有提供序列化我的管道的DAG连接的方法,所以我决定有一个WrapperTask接收到一个json config file然后创建所有任务实例并连接所有输入输出通道luigi任务(做所有的管道).

我特此附上一个小测试程序供您审查:

import random
import luigi
import time
import os


class TaskNode(luigi.Task):
    i = luigi.IntParameter()  # node ID

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.required = []

    def set_required(self, required=None):
        self.required = required  # set the dependencies
        return self

    def requires(self):
        return self.required

    def output(self):
        return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))

    def run(self):
        with self.output().open('w') as outfile:
            outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
        self.process()

    def process(self):
        raise NotImplementedError(self.__class__.__name__ + " must implement this method")


class FastNode(TaskNode):

    def process(self):
        time.sleep(1) …
Run Code Online (Sandbox Code Playgroud)

python python-3.x luigi data-pipeline

10
推荐指数
1
解决办法
458
查看次数

通过数据管道截断 DynamoDb 或重写数据

可以通过数据管道转储 DynamoDb,也可以在 DynamoDb 中导入数据。导入进展顺利,但数据一直附加到 DynamoDb 中已经存在的数据。

现在我找到了扫描 DynamoDb 并一个一个或通过 Batch 删除项目的工作示例。但无论如何,对于大量数据来说,这不是一个好的变体。

也可以完全删除表并创建它。但是随着该变体索引将丢失。

因此,最好的方法是通过数据管道导入或以某种方式截断来覆盖 DynamoDb 数据。有可能吗?如果是的话,怎么可能?

truncate amazon-dynamodb amazon-data-pipeline data-pipeline

7
推荐指数
1
解决办法
8782
查看次数

如何从Airflow SimpleHttpOperator GET请求访问响应

我正在学习Airflow并且有一个简单的问题.下面是我的DAG叫dog_retriever

import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json



default_args = {
    'owner': 'Loftium',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 9),
    'email': 'rachel@loftium.com',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=3),
}

dag = DAG('dog_retriever',
    schedule_interval='@once',
    default_args=default_args)

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_breeds',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breeds/list',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

作为测试Airflow的一种方法,我只是在这个非常简单的http://dog.ceo API中向一些端点发出两个GET请求.目标是学习如何处理通过Airflow检索的一些数据

执行正在运行 - 我的代码成功调用了任务t1和t2中的enpoints,我可以看到它们按照set_upstream我写的规则以正确的顺序记录在Airflow UI中. …

airflow data-pipeline apache-airflow

6
推荐指数
2
解决办法
6815
查看次数

Firehose 数据管道限制

我的用例如下:我有 JSON 数据需要以 parquet 格式存储在 S3 中。到目前为止一切顺利,我可以在 Glue 中创建一个架构并将“DataFormatConversionConfiguration”附加到我的 Firehose 流。但数据来自不同的“主题”。每个主题都有一个特定的“模式”。根据我的理解,我将必须创建多个消防流,因为一个流只能有一个模式。但我有数千个这样的主题,其中有大量高吞吐量的数据传入。创建如此多的 Firehose 资源看起来并不可行(https://docs.aws.amazon.com/firehose/latest/dev/limits.html

我应该如何构建我的管道。

bigdata amazon-web-services data-pipeline amazon-kinesis-firehose

5
推荐指数
1
解决办法
1096
查看次数

无法找到相关张量remote_handle:Op ID:14738,输出num:0

我正在使用 colab pro TPU 实例来进行补丁图像分类。我正在使用张量流版本2.3.0。

调用 model.fit 时出现以下错误: InvalidArgumentError: Unable to find the relevant tensor remote_handle: Op ID: 14738, Output num: 0带有以下跟踪:

--------
InvalidArgumentError                      Traceback (most recent call last)
<ipython-input-20-5fd2ec1ce2f9> in <module>()
     15         steps_per_epoch=STEPS_PER_EPOCH,
     16         validation_data=dev_ds,
---> 17         validation_steps=VALIDATION_STEPS
     18     )

6 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in _method_wrapper(self, *args, **kwargs)
    106   def _method_wrapper(self, *args, **kwargs):
    107     if not self._in_multi_worker_mode():  # pylint: disable=protected-access
--> 108       return method(self, *args, **kwargs)
    109 
    110     # Running inside `run_distribute_coordinator` already.

/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in fit(self, x, y, batch_size, …
Run Code Online (Sandbox Code Playgroud)

keras data-pipeline tensorflow-datasets google-colaboratory tpu

5
推荐指数
1
解决办法
2288
查看次数

dvc.yaml 应该由 dvc run 命令编写或生成吗?

尝试理解dvc,大多数教程都提到通过运行命令生成 dvc.yaml dvc run

但同时,定义 DAG 的 dvc.yaml 也有很好的文档记录。此外,它是 yaml 格式且人类可读/可写的事实表明它是用于指定数据管道的 DSL。

有人可以澄清哪种做法更好吗?编写 dvc.yaml 还是让其通过dvc run命令生成?还是由用户选择而没有技术上的差异?

directed-acyclic-graphs data-pipeline dvc

5
推荐指数
1
解决办法
2540
查看次数

批量添加 ttl 列到 dynamodb 表

我有一个用例,需要将 ttl 列添加到现有表中。目前,该表拥有超过20亿条记录。

是否有任何现有的解决方案围绕相同的构建?或者应该是 emr 是前进的道路吗?

amazon-emr emr amazon-dynamodb amazon-data-pipeline data-pipeline

4
推荐指数
1
解决办法
4129
查看次数

使用Glue从AWS RDS到S3的管道

我尝试使用AWS Glue将当前数据管道从python脚本迁移到AWS Glue。我能够设置一个搜寻器来为不同的postgres数据库提取模式。但是,在将数据从Postgres RDS提取到Athena中的S3表时,我遇到了问题。

  • 有没有一种方法可以将数据从AWS RDS直接拉到Athena中的S3表?
  • 如果是,那么如何
  • 如果没有,那么总是欢迎任何更好的建议

提前致谢 !

amazon-s3 amazon-rds data-pipeline amazon-athena aws-glue

3
推荐指数
1
解决办法
1386
查看次数

带有python flex模板的数据流 - 启动器超时

我正在尝试使用 flex 模板运行我的 python 数据流作业。当我使用直接运行器(没有 flex 模板)运行时,作业在本地运行良好,但是当我尝试使用 flex 模板运行它时,作业卡在“排队”状态一段时间,然后因超时而失败。

这是我在 GCE 控制台中找到的一些日志:

INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:'

Shutting down the GCE instance, launcher-202011121540156428385273524285797, used for launching.

Timeout in polling result file: gs://my_bucket/staging/template_launches/2020-11-12_15_40_15-6428385273524285797/operation_result.
Possible causes are:
1. Your launch takes too long time to finish. Please check the logs on stackdriver.
2. Service my_service_account@developer.gserviceaccount.com may not have enough permissions to pull container image gcr.io/indigo-computer-272415/samples/dataflow/streaming-beam-py:latest or create new objects in gs://my_bucket/staging/template_launches/2020-11-12_15_40_15-6428385273524285797/operation_result.
3. Transient …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform google-cloud-dataflow apache-beam data-pipeline

3
推荐指数
1
解决办法
1130
查看次数

Google数据融合执行错误“INVALID_ARGUMENT:'DISKS_TOTAL_GB'配额不足。请求3000.0,可用2048.0。”

我正在尝试使用 Google Data Fusion 免费版本将简单的 CSV 文件从 GCS 加载到 BQ。管道因错误而失败。它读着

com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Insufficient 'DISKS_TOTAL_GB' quota. Requested 3000.0, available 2048.0.
    at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) ~[na:na]
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[na:na]
    at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[na:na]
    at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[na:na]
    at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) ~[na:na]
Run Code Online (Sandbox Code Playgroud)

Mapreduce 和 Spark 执行管道都会重复相同的错误。感谢您为解决此问题提供的任何帮助。谢谢

问候卡

data-processing google-cloud-platform data-ingestion data-pipeline google-cloud-data-fusion

3
推荐指数
1
解决办法
5230
查看次数

Google Cloud Composer 上的 Airflow 与 Docker

我找不到很多关于在 Google Cloud Composer 和 Docker 上运行 Airflow 的差异的信息。我正在尝试将我们目前在 Google Cloud Composer 上的数据管道切换到 Docker 以仅在本地运行,但我正在尝试概念化区别是什么。

local docker airflow data-pipeline google-cloud-composer

1
推荐指数
1
解决办法
1867
查看次数