小编Kyl*_*ine的帖子

模拟/测试 HTTP Get 请求

我正在尝试为我的程序编写单元测试并使用模拟数据。我对如何拦截对 URL 的 HTTP Get 请求有点困惑。

我的程序调用 API 的 URL,并返回一个简单的 XML 文件。我希望测试不是从在线 API 获取 XML 文件,而是从我这里接收预定的 XML 文件,以便我可以将输出与预期输出进行比较,并确定一切是否正常工作。

有人向我指出了 Mockito,并且看到了许多不同的示例,例如这篇 SO 帖子,如何使用 Mockito 来测试 REST 服务?但我并不清楚如何设置这一切以及如何模拟数据(即,每当调用 URL 时都返回我自己的 xml 文件)。

我唯一能想到的是制作另一个在 Tomcat 上本地运行的程序,并在我的测试中传递一个特殊的 URL,该 URL 调用 Tomcat 上本地运行的程序,然后返回我想要测试的 xml 文件。但这似乎有点矫枉过正,我认为这是不可接受的。有人可以指出我正确的方向吗?

private static InputStream getContent(String uri) {

    HttpURLConnection connection = null;

    try {
        URL url = new URL(uri);
        connection = (HttpURLConnection) url.openConnection();
        connection.setRequestMethod("GET");
        connection.setRequestProperty("Accept", "application/xml");

        return connection.getInputStream();
    } catch (MalformedURLException e) {
        LOGGER.error("internal error", e);
    } catch (IOException e) …
Run Code Online (Sandbox Code Playgroud)

java junit unit-testing http mockito

6
推荐指数
1
解决办法
3万
查看次数

气流审计日志

我想知道 Airflow 在审计日志的意义上提供了什么。我的 Airflow 环境正在运行 Airflow 1.10 版,并使用文件的[ldap]部分airflow.cfg来使用我公司的 Active Dicrectory (AD) 进行身份验证。我看到当有人通过 Web UI 登录 Airflow 时,它会将用户名写入网络服务器的日志(如下所示)。我想知道是否可以修改 Airflow 以在用户打开/关闭 DAG、创建新的 Airflow 变量或池、清除任务、将任务标记为成功以及用户可以执行的任何其他操作时进行记录.

我需要能够对用户的活动进行某种处理,因为为了在我的工作中使用 Airflow,我必须让它通过架构师的安全审查,而他需要能够跟踪用户的活动。

这种能力是否由 Airflow 提供开箱即用?我明白,如果我要使用名为Cloud Composer 的Google Cloud 的 Airflow 服务,那么我将通过他们的服务获取审计日志,但不幸的是,我与 Amazon Web Services (AWS) 生态系统相关联,并且我自己维护着 Airflow(不是通过服务)。

我在airflow webserver日志中看到,当我遍历 Airflow Web UI 时,它正在发送休息调用

161.179.215.170 - - [17/Sep/2018:16:39:26 -0400] "GET /admin/ HTTP/1.1" 200 71942 "http://1.2.3.4:8080/admin/airflow/graph?dag_id=ARL_OnDemand" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"
Run Code Online (Sandbox Code Playgroud)

当我登录时,我看到它告诉我用户名(登录到login这里的功能 …

airflow

6
推荐指数
1
解决办法
3276
查看次数

Python是否具有Java等效的throw new Exception("text here")

我是一名Java开发人员,他是Python的新手,我将Java类重写为Python类.我试图尽可能地模仿Python类中原始类的流程.Java类有几行,

    if(condition)
       throw new Exception("text here")
Run Code Online (Sandbox Code Playgroud)

我一直在查看Python文档中的异常,并且无法找到与Java语法等效的Python.

我已经raise Exception("text here")通过阅读这个StackOverflow帖子尝试了一些东西(我认为很接近),但似乎这是在try except块中使用并且会导致从try块跳转到except块; 而我正试图避免try except阻塞而只是抛出异常.

我认为可行的解决方案是这样的,

    try:
        if(condition):
           raise Exception("text here")
    except:
        ...
Run Code Online (Sandbox Code Playgroud)

但我想知道是否有一种 Java 方法更密切相关的方法,以便我可以尽可能多地维护流程(让它们看起来相似).

python exception

5
推荐指数
2
解决办法
1297
查看次数

气流psycopg2.OperationalError:严重:对不起,已经有太多客户端

我有一个四节点集群的Airflow环境,对我来说,几个月来一直运行良好。

ec2实例

  • 服务器1:Web服务器,调度程序,Redis队列,PostgreSQL数据库
  • 服务器2:Web服务器
  • 服务器3:工作者
  • 服务器4:工作者

最近,我正在开发一个更复杂的DAG,其中包含数十个任务,而我之前从事的工作相对较小。我不确定这是否就是为什么我现在看到此错误弹出或者是什么原因,但是我偶尔会收到此错误:

在Airflow UI上,该任务的日志下:

psycopg2.OperationalError: FATAL: sorry, too many clients already
Run Code Online (Sandbox Code Playgroud)

在Webserver上(运行airflow webserver的输出),我也得到了相同的错误:

[2018-07-23 17:43:46 -0400] [8116] [ERROR] Exception in worker process
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 2158, in _wrap_pool_connect
    return fn()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/pool.py", line 403, in connect
    return _ConnectionFairy._checkout(self)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/pool.py", line 788, in _checkout
    fairy = _ConnectionRecord.checkout(pool)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/pool.py", line 532, in checkout
    rec = pool._do_get()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/pool.py", line 1193, in _do_get
    self._dec_overflow()
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 66, in …
Run Code Online (Sandbox Code Playgroud)

postgresql postgresql-9.2 airflow

5
推荐指数
1
解决办法
2862
查看次数

pthread_join()用于异步线程

我编写了一个简单的演示程序,以便我能理解pthread_join()函数.

我知道如何使用pthread_condition_wait()函数来允许异步线程,但我正在尝试理解如何使用pthread_join()函数执行类似的工作.

在下面的程序中,我将Thread 1s ID 传递给Thread 2s函数.在Thread 2s函数中,我调用pthread_join()函数并传入Thread 1s ID.我希望这会导致线程1首先运行,然后线程2运行第二,但我得到的是它们都同时运行.

这是因为一次只有一个线程可以使用pthread_join()函数,并且当我从主线程调用它时我已经在使用pthread_join()函数了吗?

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

void *functionCount1();
void *functionCount2(void*);

int main()
{

    /* 
        How to Compile
         gcc -c foo
         gcc -pthread -o foo foo.o
    */

    printf("\n\n");

    int rc;
    pthread_t thread1, thread2;

    /* Create two thread --I took out error checking for clarity*/
    pthread_create( &thread1, …
Run Code Online (Sandbox Code Playgroud)

c multithreading asynchronous pthreads pthread-join

4
推荐指数
1
解决办法
6989
查看次数

气流 - 未知的蓝色任务状态

我刚刚得到一个蓝色的任务,它没有出现在状态图例中.我很好奇这是一个错误还是一个无证件的状态.

在此输入图像描述

如您所见,右侧的潜在状态列表中未显示蓝色.我刚刚完成了所有过去,未来和上游尝试的清除fyi.

airflow

4
推荐指数
2
解决办法
567
查看次数

maven插件描述符未生成

我正在创建一个maven插件,MVN clean install build成功但是plugin.xml没有生成.

@Mojo( name = "cover",  defaultPhase = LifecyclePhase.POST_INTEGRATION_TEST)
public class RunCoverage extends AbstractMojo
{

    @Parameter( property = "cover.wadl", defaultValue = "test")
    private String wadl;

    @Parameter( property = "cover.endpoints",defaultValue = "test")
    private String endpoints;

    @Override
    public void execute() throws MojoExecutionException
    {
        <somecode>
    }
}
Run Code Online (Sandbox Code Playgroud)

而pom.xml是

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>end-point-test-coverage</artifactId>
    <version>1</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.maven</groupId>
            <artifactId>maven-plugin-api</artifactId>
            <version>2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.maven.plugin-tools</groupId>
            <artifactId>maven-plugin-annotations</artifactId>
            <version>3.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-plugin-plugin</artifactId>
                <version>3.2</version>
                <executions>
                    <execution>
                        <id>default-descriptor</id>
                        <goals> …
Run Code Online (Sandbox Code Playgroud)

maven-plugin maven

3
推荐指数
2
解决办法
1万
查看次数

气流DAG每隔一分钟运行一次

我正在尝试将DAG安排为每分钟运行一次,但它似乎每秒都在运行。根据我已阅读的所有内容,我只需要schedule_interval='*/1 * * * *', #..every 1 minute在DAG中添加它,就是这样,但是它不起作用。这是我设置的一个简单示例进行测试:

from airflow import DAG
from airflow.operators import SimpleHttpOperator, HttpSensor, EmailOperator, S3KeySensor
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 6, 4),
    'schedule_interval': '*/1 * * * *', #..every 1 minute
    'email': ['airflow@airflow.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    dag_id='airflow_slack_example',
    start_date=datetime(2018, 6, 4),
    max_active_runs=3,
    schedule_interval='*/1 * * * *', #..every 1 minute
    default_args=default_args,
)

test= …
Run Code Online (Sandbox Code Playgroud)

airflow airflow-scheduler

3
推荐指数
2
解决办法
1842
查看次数

气流 - 在任务之间锁定,以便一次只运行一个并行任务?

我有一个DAG有三个任务流(licappts,agent,agentpolicy):

在此输入图像描述

为简单起见,我称这三个不同的流.流是独立的,因为agentpolicy失败并不意味着其他两个(liceappts和agent)应该受到其他流失败的影响.

但是对于sourceType _emr_task_1任务(即licappts_emr_task_1,agents_emr_task_1和agentpolicy_emr_task_1),我一次只能运行其中一个任务.例如,我不能同时运行agents_emr_task_1和agentpolicy_emr_task_1,即使它们是两个不一定关心的独立任务.

如何在Airflow中实现此功能?现在我唯一可以想到的是将该任务包装在一个以某种方式锁定全局变量的脚本中,然后如果该变量被锁定,我将让脚本执行Thread.sleep(60秒)或其他东西,然后重试.但这似乎非常hacky,我很好奇Airflow为此提供了解决方案.

如果需要实现这一点,我愿意重组我的DAG的订单.我想做的一件事是做一个硬编码的排序

Dag Starts -> ... -> licappts_emr_task_1 -> agents_emr_task_1 -> agentpolicy_emr_task_1 -> DAG Finished
Run Code Online (Sandbox Code Playgroud)

但我不认为以这种方式组合流,因为例如agentpolicy_emr_task_1必须等待其他两个完成才能启动,并且有时agentpolicy_emr_task_1准备好在其他两个完成其他任务之前完成.

理想情况下,我希望任何sourceType _emr_task_1任务首先启动它,然后阻止其他任务运行其sourceType _emr_task_1任务,直到它完成为止.

更新:

我刚才想到的另一个解决方案是,如果我有办法检查另一个任务的状态,我可以为sourceType _emr_task_1 创建一个脚本,检查其他两个sourceType _emr_task_1任务是否有任何运行状态,以及如果他们这样做,它会睡觉,并定期检查是否其他人都没有运行,在这种情况下,它将启动它的过程.我不是这种方式的忠实粉丝,因为我觉得它可能会导致竞争条件,其中两者都在读取(同时)没有运行并且都开始运行.

python-3.x airflow

3
推荐指数
1
解决办法
959
查看次数

气流获取重试次数

在我的Airflow DAG中,我有一个任务需要知道它是第一次运行还是重试运行.如果是重试尝试,我需要在任务中调整逻辑.

我对如何存储任务的重试次数有一些想法,但我不确定它们中的任何一个是否合法,或者是否有更容易的内置方式来获取任务中的这些信息.

  • 我想知道我是否可以在每次任务运行时附加的dag中都有一个整数变量.然后,如果任务重新我可以检查变量的值,看它是否大于1,因此将是重试运行.但我不确定可变全局变量是否在Airflow中以这种方式工作,因为可以有多个工作人员执行不同的任务(虽然我不确定).

  • 把它写在XCOM变量中?

airflow

3
推荐指数
2
解决办法
2141
查看次数