小编And*_*dor的帖子

Apache Airflow 不强制执行 dagrun_timeout

我将 Apache Airflow 版本 1.10.3 与顺序执行器一起使用,如果 DAG 尚未完成,我希望 DAG 在一段时间后失败。我尝试dagrun_timeout在示例代码中设置

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'start_date': datetime(2019, 6, 1),
    'retries': 0,
}

dag = DAG('min_timeout', default_args=default_args, schedule_interval=timedelta(minutes=5), dagrun_timeout = timedelta(seconds=30), max_active_runs=1)

t1 = BashOperator(
    task_id='fast_task',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='slow_task',
    bash_command='sleep 45',
    dag=dag)

t2.set_upstream(t1)
Run Code Online (Sandbox Code Playgroud)

slow_task单独花费的时间超过了设置的时间限制dagrun_timeout,所以我的理解是气流应该停止 DAG 执行。但是,这不会发生,并且允许 slow_task 在其整个持续时间内运行。发生这种情况后,运行将被标记为失败,但这不会根据需要终止任务或 DAG。使用execution_timeoutforslow_task确实会导致任务在指定的时间限制内被终止,但我更愿意为 DAG 使用总体时间限制,而不是execution_timeout为每个任务指定。

还有什么我应该尝试实现这种行为的,或者我可以修复的任何错误?

airflow

5
推荐指数
1
解决办法
566
查看次数

如何从python中的其他方法调用__private静态方法

我假设 Python 类中的私有静态方法是可以而且应该完成的。但也许实际上,我应该只是在类之外使用模块私有方法。

我想了解从不同位置调用不同类型的静态方法:

我有一个带有私有和公共静态方法的 Python 类。我想从其他地方和彼此打电话给他们。

在类外并调用公共静态方法时,我必须添加类名。IE

m = MyClass.the_staticmethod(100) # I must use the classname as a prefix
Run Code Online (Sandbox Code Playgroud)

查看代码中的问题:

    class Myclass():
        @staticmethod
        __my_privatestaticmethod(myparam):
             return myparam

        @staticmethod
        def the_staticmethod(myparam):
            # will the following work?
            result = __my_staticmethod(1) # will this work?

            # data-mingling set as private, so following line cannot work!
            result = Myclass.__my_staticmethod(2) # this cannot work. 

            result = the_staticmethod(3) # will this work without the prefix

            return result
 

        def __my_privatemethod(self, param1):
            # which of the following are …
Run Code Online (Sandbox Code Playgroud)

python methods static class

4
推荐指数
1
解决办法
1万
查看次数

Kubernetes Pod 反亲和力 - 基于标签均匀分布 Pod?

我们发现我们的 Kubernetes 集群往往存在热点,其中某些节点比其他节点获得更多的应用程序实例。

在本例中,我们部署了大量 Apache Airflow 实例,并且某些节点的 Web 或调度程序组件比其他节点多 3 倍。

是否可以使用反亲和性规则来强制 Pod 在集群中更均匀地分布?

例如“更喜欢标签最少的节点component=airflow-web?”

如果反亲和力不起作用,我们是否还应该研究其他机制?

scheduling affinity kubernetes

4
推荐指数
1
解决办法
2252
查看次数

Kubernetes livenessProbe http 结果?

我们有一个 HTTPlivenessProbe设置,如果服务不健康,它会返回 500,并打印出问题所在。
有什么办法可以查看返回的输出livenessProbe吗?

我可以将它记录在应用程序中,但也许可以从 Kubernetes 中查看?

目前,我看到的唯一一件事是pod describe

Killing container with id docker://12568746c312e6646fd6ecdb2123db448be0bc6808629b1a63ced8b7298be444:pod "test-3893895584-7f4cr_test(524091bd-49d8-11e7-bd00-42010a840224)" container "test" is unhealthy, it will be killed and re-created.
Run Code Online (Sandbox Code Playgroud)

在 GKE 上运行

kubernetes

3
推荐指数
1
解决办法
913
查看次数

将 OracleOperator 的输出发送到 Airflow 中的另一个任务

我需要在另一个任务中使用 oracleOperator 的输出以进一步执行。我遇到的麻烦是,当我将数据拉入另一个任务并打印它时,它给出的结果为 None。没有抛出错误,但没有传递数据。此外,任务 UI 中的xcom选项卡的键和值显示为空白。

我的代码如下:

from airflow import DAG
from airflow.operators.oracle_operator import OracleOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'xyz',
    'start_date': days_ago(2),
}

dag = DAG('example_xcom', schedule_interval="@once", default_args=args, tags=['example'])


def puller(**kwargs):
    ti = kwargs['ti']
    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
    print("VALUE IN PULLER ")
    print(pulled_value_1)

pull = PythonOperator(
    task_id='pullee',
    dag=dag,
    python_callable=puller,
    provide_context=True,
)
push = OracleOperator(
    task_id='data',
    sql='SELECT * FROM CUSTOMERS', 
    oracle_conn_id='1',
    provide_context=True,
    dag=dag,
)


push>>pull
Run Code Online (Sandbox Code Playgroud)

airflow apache-airflow-xcom

3
推荐指数
1
解决办法
970
查看次数

将 yaml(作为数据)放入配置映射中

有没有办法将 yaml 数据存储在配置映射中?

在我的values.yaml我有类似下面的东西

config:
  filters:
    - kind: Pod
      apiVersion: v1
...
Run Code Online (Sandbox Code Playgroud)

在我的配置图中,我目前正在做

...
data:
  config.yaml: |-
    {{ .Values.config }}
Run Code Online (Sandbox Code Playgroud)

但在结果中,configmap数据被“内联”并格式化为这样

...
data:
  config.yaml: >-
    map[filters:[map[apiVersion:v1...
Run Code Online (Sandbox Code Playgroud)

这不是yaml,因此不能被读取它的应用程序解析。

kubernetes kubernetes-helm configmap

3
推荐指数
1
解决办法
2364
查看次数

Airflow 2 - 导入错误:无法从“airflow.operators”导入名称“BashOperator”

升级到 Airflow 2 后,我在某些 DAG 中遇到了该错误:

ImportError: cannot import name 'BashOperator' from 'airflow.operators'
Run Code Online (Sandbox Code Playgroud)

airflow

3
推荐指数
1
解决办法
1万
查看次数

JwtAccessTokenConverterConfigurer 替代?

我在我的安全实现中使用 JwtAccessTokenConverterConfigurer 接口来为我的 Spring 启动微服务提供 oAuth2.0。

我已经用这个实现了一个 JWTAccessTokenCustomizer。但我看到 JwtAccessTokenConverterConfigurer 已被弃用。我现在可以这样做的替代方法是什么?

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.boot.autoconfigure.security.oauth2.resource.JwtAccessTokenConverterConfigurer
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken
import org.springframework.security.core.Authentication
import org.springframework.security.core.GrantedAuthority
import org.springframework.security.core.authority.AuthorityUtils
import org.springframework.security.oauth2.provider.OAuth2Authentication
import org.springframework.security.oauth2.provider.OAuth2Request
import org.springframework.security.oauth2.provider.token.DefaultAccessTokenConverter
import org.springframework.security.oauth2.provider.token.store.JwtAccessTokenConverter
import java.util.*

//FIXME: JwtAccessTokenConverterConfigurer is deprecated; do something
class JwtAccessTokenCustomizer() : DefaultAccessTokenConverter(), JwtAccessTokenConverterConfigurer
Run Code Online (Sandbox Code Playgroud)

spring-security jwt spring-boot spring-security-oauth2

2
推荐指数
1
解决办法
2098
查看次数

如何从 Docker Hub 中删除一个 repo

如何从 Docker Hub 中完全删除存储库?

Docker 发展迅速,他们的网站也是如此。这是从 docker hub Web 界面删除你的 repo 的最新方法。

repository docker dockerhub

2
推荐指数
1
解决办法
1468
查看次数

容器与主机(本机)性能

  • 既然容器是轻量级的操作系统虚拟化,那么我们能否获得与原生(主机)相同的性能呢?

  • 性能上会有什么差异?

任何线索都将受到高度赞赏,或者如果您有任何分析报告或任何有关主机与容器性能比较的参考将会有所帮助。

IA

containers amazon-ecs docker kubernetes google-kubernetes-engine

2
推荐指数
1
解决办法
9032
查看次数

Grafana 模板:Prometheus label_values 变量的正则表达式

我正在尝试使用label_values函数在 Grafana 中设置模板。该文档指定了查询 label_values 的可能性,例如:

label_values(metric, label)
Run Code Online (Sandbox Code Playgroud)

在我的用例中,有两个主要的度量标准组,其名称类似于:

  • app1_current_sensor1
  • app1_current_sensor2
  • app2_current_sensor2
  • app2_current_sensor3

他们每个人都有一个名为'uid'的标签。我希望使用上述查询过滤一个仪表板上的“app1”和另一个仪表板上的“app2”的用户 ID

我试过了

label_values(app1_current_sensor1, uid)
Run Code Online (Sandbox Code Playgroud)

但是,如果由于某种原因 sensor1 有一段时间没有发送数据,即使 sensor2 正在发送数据,我也不会在仪表板上看到更多用户 ID。

是否可以使用正则表达式作为度量变量的输入?这样的事情对我有用:

label_values(metric=~(app1_[^\s]+), uid)
Run Code Online (Sandbox Code Playgroud)

但我不确定这在 Grafana 中是否可行。

grafana prometheus grafana-templating

1
推荐指数
1
解决办法
7665
查看次数

如何在 Antd Table 中将多个数据显示到单个列/单元格中?

例如:-我从 firebase 获得了四个 json 格式数据

  {
     "firstName":"FName1",
     "lastName":"LName1",
     "initial":"F.L1",
     "Age":"25"
  },
  {
     "firstName":"FName2",
     "lastName":"LName2",
     "initial":"F.L2",
     "Age":"23"
  }
``
Run Code Online (Sandbox Code Playgroud)

我想将所有三个数据显示到一列中:就像--

     **Full Name**             **Age**
     FName1 LName1 (F.L1)        25
     FName2 LName2 (F.L2)        23
Run Code Online (Sandbox Code Playgroud)

antd

0
推荐指数
1
解决办法
5074
查看次数