我有一个执行一些不稳定计算的 luigi 任务。想想有时不收敛的优化过程。
import luigi
MyOptimizer(luigi.Task):
input_param: luigi.Parameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
optimize_something(self.input_param, self.output().path)
def output(self):
return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)
现在我想构建一个包装器任务,该任务将使用不同的输入参数多次运行此优化器,并获取收敛的第一次运行的输出。
我现在实现它的方式是不使用,MyOptimizer因为如果它失败,luigi 会认为包装器任务也失败了,但我可以接受一些MyOptimizer失败的实例。
MyWrapper(luigi.Task):
input_params_list = luigi.ListParameter()
output_filename = luigi.Parameter(default='result.json')
def run(self):
for input_param in self.input_params_list:
try:
optimize_something(self.input_param, self.output().path)
print(f"Optimizer succeeded with input {input_param}")
break
except Exception as e:
print(f"Optimizer failed with input {input_param}. Trying again...")
def output(self):
return luigi.LocalTarget(self.output_filename)
Run Code Online (Sandbox Code Playgroud)
问题在于,通过这种方式,任务不会并行化。此外,您可以想象MyOptimizer和optimize_something参与由 luigi 处理的数据管道的复杂任务,这在我的代码中造成了相当多的混乱。
我将不胜感激有关如何以类似路易吉的方式进行这项工作的任何见解和想法:)
Tensorflow似乎缺少".npy"文件的读者.如何将我的数据文件读入新的tensorflow.data.Dataset pipline?我的数据不适合内存.
每个对象都保存在单独的".npy"文件中.每个文件包含2个不同的ndarray作为特征,标量作为标签.
我是luigi的新手,在为我们的ML工作设计管道时遇到过它.虽然它不适合我的特定用例,但它有很多额外的功能,我决定让它适合.
基本上我正在寻找的是一种能够持久保存自定义构建管道并因此使其结果可重复且易于部署的方法,在阅读了大多数在线教程之后,我尝试使用现有luigi.cfg配置和命令行机制实现我的序列化并且它可能已经足够用于任务的参数但它没有提供序列化我的管道的DAG连接的方法,所以我决定有一个WrapperTask接收到一个json config file然后创建所有任务实例并连接所有输入输出通道luigi任务(做所有的管道).
我特此附上一个小测试程序供您审查:
import random
import luigi
import time
import os
class TaskNode(luigi.Task):
i = luigi.IntParameter() # node ID
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.required = []
def set_required(self, required=None):
self.required = required # set the dependencies
return self
def requires(self):
return self.required
def output(self):
return luigi.LocalTarget('{0}{1}.txt'.format(self.__class__.__name__, self.i))
def run(self):
with self.output().open('w') as outfile:
outfile.write('inside {0}{1}\n'.format(self.__class__.__name__, self.i))
self.process()
def process(self):
raise NotImplementedError(self.__class__.__name__ + " must implement this method")
class FastNode(TaskNode):
def process(self):
time.sleep(1) …Run Code Online (Sandbox Code Playgroud) 可以通过数据管道转储 DynamoDb,也可以在 DynamoDb 中导入数据。导入进展顺利,但数据一直附加到 DynamoDb 中已经存在的数据。
现在我找到了扫描 DynamoDb 并一个一个或通过 Batch 删除项目的工作示例。但无论如何,对于大量数据来说,这不是一个好的变体。
也可以完全删除表并创建它。但是随着该变体索引将丢失。
因此,最好的方法是通过数据管道导入或以某种方式截断来覆盖 DynamoDb 数据。有可能吗?如果是的话,怎么可能?
我正在学习Airflow并且有一个简单的问题.下面是我的DAG叫dog_retriever
import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json
default_args = {
'owner': 'Loftium',
'depends_on_past': False,
'start_date': datetime(2017, 10, 9),
'email': 'rachel@loftium.com',
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=3),
}
dag = DAG('dog_retriever',
schedule_interval='@once',
default_args=default_args)
t1 = SimpleHttpOperator(
task_id='get_labrador',
method='GET',
http_conn_id='http_default',
endpoint='api/breed/labrador/images',
headers={"Content-Type": "application/json"},
dag=dag)
t2 = SimpleHttpOperator(
task_id='get_breeds',
method='GET',
http_conn_id='http_default',
endpoint='api/breeds/list',
headers={"Content-Type": "application/json"},
dag=dag)
t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)
作为测试Airflow的一种方法,我只是在这个非常简单的http://dog.ceo API中向一些端点发出两个GET请求.目标是学习如何处理通过Airflow检索的一些数据
执行正在运行 - 我的代码成功调用了任务t1和t2中的enpoints,我可以看到它们按照set_upstream我写的规则以正确的顺序记录在Airflow UI中. …
我的用例如下:我有 JSON 数据需要以 parquet 格式存储在 S3 中。到目前为止一切顺利,我可以在 Glue 中创建一个架构并将“DataFormatConversionConfiguration”附加到我的 Firehose 流。但数据来自不同的“主题”。每个主题都有一个特定的“模式”。根据我的理解,我将必须创建多个消防流,因为一个流只能有一个模式。但我有数千个这样的主题,其中有大量高吞吐量的数据传入。创建如此多的 Firehose 资源看起来并不可行(https://docs.aws.amazon.com/firehose/latest/dev/limits.html)
我应该如何构建我的管道。
bigdata amazon-web-services data-pipeline amazon-kinesis-firehose
我正在使用 colab pro TPU 实例来进行补丁图像分类。我正在使用张量流版本2.3.0。
调用 model.fit 时出现以下错误: InvalidArgumentError: Unable to find the relevant tensor remote_handle: Op ID: 14738, Output num: 0带有以下跟踪:
--------
InvalidArgumentError Traceback (most recent call last)
<ipython-input-20-5fd2ec1ce2f9> in <module>()
15 steps_per_epoch=STEPS_PER_EPOCH,
16 validation_data=dev_ds,
---> 17 validation_steps=VALIDATION_STEPS
18 )
6 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in _method_wrapper(self, *args, **kwargs)
106 def _method_wrapper(self, *args, **kwargs):
107 if not self._in_multi_worker_mode(): # pylint: disable=protected-access
--> 108 return method(self, *args, **kwargs)
109
110 # Running inside `run_distribute_coordinator` already.
/usr/local/lib/python3.6/dist-packages/tensorflow/python/keras/engine/training.py in fit(self, x, y, batch_size, …Run Code Online (Sandbox Code Playgroud) keras data-pipeline tensorflow-datasets google-colaboratory tpu
我有一个用例,需要将 ttl 列添加到现有表中。目前,该表拥有超过20亿条记录。
是否有任何现有的解决方案围绕相同的构建?或者应该是 emr 是前进的道路吗?
amazon-emr emr amazon-dynamodb amazon-data-pipeline data-pipeline
我尝试使用AWS Glue将当前数据管道从python脚本迁移到AWS Glue。我能够设置一个搜寻器来为不同的postgres数据库提取模式。但是,在将数据从Postgres RDS提取到Athena中的S3表时,我遇到了问题。
提前致谢 !
我正在尝试使用 flex 模板运行我的 python 数据流作业。当我使用直接运行器(没有 flex 模板)运行时,作业在本地运行良好,但是当我尝试使用 flex 模板运行它时,作业卡在“排队”状态一段时间,然后因超时而失败。
这是我在 GCE 控制台中找到的一些日志:
INFO:apache_beam.runners.portability.stager:Executing command: ['/usr/local/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/dataflow-requirements-cache', '-r', '/dataflow/template/requirements.txt', '--exists-action', 'i', '--no-binary', ':all:'
Shutting down the GCE instance, launcher-202011121540156428385273524285797, used for launching.
Timeout in polling result file: gs://my_bucket/staging/template_launches/2020-11-12_15_40_15-6428385273524285797/operation_result.
Possible causes are:
1. Your launch takes too long time to finish. Please check the logs on stackdriver.
2. Service my_service_account@developer.gserviceaccount.com may not have enough permissions to pull container image gcr.io/indigo-computer-272415/samples/dataflow/streaming-beam-py:latest or create new objects in gs://my_bucket/staging/template_launches/2020-11-12_15_40_15-6428385273524285797/operation_result.
3. Transient …Run Code Online (Sandbox Code Playgroud) google-cloud-platform google-cloud-dataflow apache-beam data-pipeline
我正在尝试使用 Google Data Fusion 免费版本将简单的 CSV 文件从 GCS 加载到 BQ。管道因错误而失败。它读着
com.google.api.gax.rpc.InvalidArgumentException: io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Insufficient 'DISKS_TOTAL_GB' quota. Requested 3000.0, available 2048.0.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:49) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72) ~[na:na]
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60) ~[na:na]
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97) ~[na:na]
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68) ~[na:na]
Run Code Online (Sandbox Code Playgroud)
Mapreduce 和 Spark 执行管道都会重复相同的错误。感谢您为解决此问题提供的任何帮助。谢谢
问候卡
data-processing google-cloud-platform data-ingestion data-pipeline google-cloud-data-fusion
我找不到很多关于在 Google Cloud Composer 和 Docker 上运行 Airflow 的差异的信息。我正在尝试将我们目前在 Google Cloud Composer 上的数据管道切换到 Docker 以仅在本地运行,但我正在尝试概念化区别是什么。
data-pipeline ×13
airflow ×2
luigi ×2
python ×2
amazon-emr ×1
amazon-rds ×1
amazon-s3 ×1
apache-beam ×1
aws-glue ×1
bigdata ×1
dataflow ×1
dataset ×1
docker ×1
dvc ×1
emr ×1
keras ×1
local ×1
numpy ×1
python-3.x ×1
tensorflow ×1
tpu ×1
truncate ×1