小编Pro*_*120的帖子

requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))

我有这个讨厌的错误:

Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.5/site-packages/urllib3/connectionpool.py", line 600, in urlopen
    chunked=chunked)
  File "/home/ubuntu/.local/lib/python3.5/site-packages/urllib3/connectionpool.py", line 384, in _make_request
    six.raise_from(e, None)
  File "<string>", line 2, in raise_from
  File "/home/ubuntu/.local/lib/python3.5/site-packages/urllib3/connectionpool.py", line 380, in _make_request
    httplib_response = conn.getresponse()
  File "/usr/lib/python3.5/http/client.py", line 1197, in getresponse
    response.begin()
  File "/usr/lib/python3.5/http/client.py", line 297, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python3.5/http/client.py", line 258, in _read_status
    line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
  File "/usr/lib/python3.5/socket.py", line 575, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by …
Run Code Online (Sandbox Code Playgroud)

python

18
推荐指数
2
解决办法
3万
查看次数

如何将 SQL 作为带参数的文件传递给 Airflow Operator

我在气流中有一个操作员:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

现在,我需要运行的实际查询有 24 行。我想将它保存在一个文件中,并为操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理需要 SQL 的参数。

建议?

编辑:这是我的代码:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 
Run Code Online (Sandbox Code Playgroud)

这给出:

jinja2.exceptions.UndefinedError: 'templates_dict' 未定义

airflow

10
推荐指数
1
解决办法
9978
查看次数

TypeError:不带编码的字符串参数

我想将Json的压缩gzip上传到Google Storage。

我有以下代码:

import datalab.storage as storage
import gzip
path = prefix + '/orders_newline.json.gz'
storage.Bucket('orders').item(path).write_to(gzip.compress(bytes(create_jsonlines(source)),encoding='utf8'), 'application/json')
Run Code Online (Sandbox Code Playgroud)

create_jsonlines(source)是返回的Json换行符分隔的功能。

运行这段代码可以得到:

TypeError: string argument without an encoding
Run Code Online (Sandbox Code Playgroud)

Python文档说,格式是:bytes([source[, encoding[, errors]]])我不知道我把它理解为不存在如何使用它的例子。

我也尝试过

bytes([(create_jsonlines(source))[,encoding='utf8']])
Run Code Online (Sandbox Code Playgroud)

这给出了:

SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)

我正在运行Python 3.5

python google-cloud-storage google-cloud-platform google-cloud-datalab

9
推荐指数
2
解决办法
2万
查看次数

如何在Systemd中使用Airflow Scheduler?

该文档指定了集成说明

我想要的是,每次调度程序停止工作时,它将自己重新启动。通常,我使用手动启动它,airflow scheduler -D但有时在不可用时停止。

阅读文档时,我不确定配置。

GitHub上包含以下文件:

airflow
airflow-scheduler.service
airflow.conf
Run Code Online (Sandbox Code Playgroud)

我正在运行Ubuntu 16.04

气流安装在:

home/ubuntu/airflow
Run Code Online (Sandbox Code Playgroud)

我有以下路径:

etc/systemd
Run Code Online (Sandbox Code Playgroud)

该文档说:

将它们复制(或链接)到/ usr / lib / systemd / system

  1. 复制哪个文件?

将airflow.conf复制到/etc/tmpfiles.d/

  1. 什么是tmpfiles.d?

  2. 什么是# AIRFLOW_CONFIG=在气流文件?

或者换句话说...关于如何做到这一点的更多“脚踏实地”指南?

airflow

7
推荐指数
1
解决办法
1628
查看次数

如何在 Airflow 中格式化宏?

我有以下内容:

EXEC_DATE1 = '{{ macros.ds_add(ds, 1) }}'


EXEC_DATE2 = '{{ execution_date }}'
Run Code Online (Sandbox Code Playgroud)

我想创建如下所示的路径变量:

path1 = EXEC_DATE1 + '/' + HH:MM (of EXEC_DATE1)
path2 = EXEC_DATE2 + '/' + HH:MM (of EXEC_DATE2)
Run Code Online (Sandbox Code Playgroud)

最终它应该是这样的:

2018-09-16/10:41
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?

我试过:

EXEC_DATE = '{{ execution_date }}'
EXEC_DATE = EXEC_DATE.strftime('%Y-%m-%d/%H:%M')
Run Code Online (Sandbox Code Playgroud)

但它给出了:

'str' object has no attribute 'strftime'
Run Code Online (Sandbox Code Playgroud)

编辑: 我的代码:

EXEC_TIMESTAMP_PATH = "{{  execution_date.strftime('%Y-%m-%d/%H:%M') }}"
EXEC_DATE = "{{  execution_date.strftime('%H:%M') }}"
EXEC_TIME = "{{  mexecution_date.strftime('%Y-%m-%d') }}"

task3_op= BashOperator(
    task_id='task3',
    params={'EXEC_DATE':EXEC_DATE, 'EXEC_TIME':EXEC_TIME},
    bash_command="""python3 script.py '{{ var.value.task3_variable }}' '{{ params.EXEC_DATE }}' …
Run Code Online (Sandbox Code Playgroud)

airflow

5
推荐指数
1
解决办法
2万
查看次数

无法针对 BigQuery 运行查询 - 权限错误 403

我有一个IAM具有角色的用户:BigQuery Data Editor 在我的数据集中,我确实Share dataset添加了具有Can Edit权限的用户。

但是,当我运行访问 BigQuery 的脚本时,出现错误 403

当我添加到我的IAM用户角色BigQuery User脚本工作。

脚本仅从SELECT该数据集中的表运行查询。

我不明白为什么我必须授予BigQuery User它才能起作用。

根据文档https://cloud.google.com/bigquery/docs/access-control

基本原理:dataEditor 角色通过为数据集中的表颁发创建、更新、删除权限来扩展 bigquery.dataViewer

roles/bigquery.dataViewerhas bigquery.tables.getDatawhich 获取表数据

我在这里做错了什么?

google-bigquery google-cloud-platform

5
推荐指数
1
解决办法
5906
查看次数

Airflow:如何从 BigQueryOperator 推动 xcom 价值?

这是我的操作员:

bigquery_check_op = BigQueryOperator(
    task_id='bigquery_check',
    bql=SQL_QUERY,
    use_legacy_sql = False,
    bigquery_conn_id=CONNECTION_ID,
    trigger_rule='all_success',
    xcom_push=True,
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

当我检查 UI 中的渲染页面时。那里什么也没有出现。当我在控制台中运行 SQL 时,它返回1400正确的值。为什么运营商不推XCOM?

我无法使用BigQueryValueCheckOperator。该运算符被设计为在值检查时失败。我不想让任何事情失败。我只是想根据查询的返回值对代码进行分支。

airflow

5
推荐指数
1
解决办法
6034
查看次数

如何从请求响应中提取HTTP错误文本?

我有以下代码:

    tries = 10
    for n in range(tries):
        try:
            ....
            responsedata = requests.get(url, data=data, headers=self.hed, verify=False)
            responsedata.raise_for_status()
            ..
            if .... : 
                break   #exit loop condition

        except (ChunkedEncodingError, requests.exceptions.HTTPError) as e:
            print ("page #{0} run #{1} failed. Returned status code {2}. Msg: {3}. Retry.".format(page, n, responsedata.status_code, sys.exc_info()[0]))
            if n == tries - 1:
               raise e  # exit the process
Run Code Online (Sandbox Code Playgroud)

我看到的照片是:

page #53 run #0 failed. Returned status code 502. Msg: <class 'requests.exceptions.HTTPError'>. Retry.
page #1 run #1 failed. Returned status code …
Run Code Online (Sandbox Code Playgroud)

python python-requests

3
推荐指数
2
解决办法
2763
查看次数

如何在 Airflow 中将 MySqlOperator 与 xcom 一起使用?

我读过这篇How to use airflow xcoms with MySqlOperator,虽然它有一个类似的标题,但它并没有真正解决我的问题。

我有以下代码:

def branch_func_is_new_records(**kwargs):
    ti = kwargs['ti']
    xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
    string_to_print = 'Value in xcom is: {}'.format(xcom)
    logging.info(string_to_print)
    if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
        return 'import_orders'
    else:
        return 'skip_operation'

query_get_max_order_id  = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
        task_id='query_get_max_order_id',
        sql= query_get_max_order_id,
        mysql_conn_id=MyCon,
        xcom_push=True,
        dag=dag)

branch_op_is_new_records = BranchPythonOperator(
    task_id='branch_operation_is_new_records',
    provide_context=True,
    python_callable=branch_func_is_new_records,
    dag=dag)

get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
Run Code Online (Sandbox Code Playgroud)

MySqlOperator根据选择下一个任务的编号返回一个编号BranchPythonOperator。保证MySqlOperator返回的值大于0

我的问题是,当我转到“我什么也没看到”时,“在用户界面上”没有推送XCOM任何 …

airflow

3
推荐指数
1
解决办法
6563
查看次数

将数据从Google Storage加载到BigQuery时如何执行UPSERT?

BigQuery支持以下策略:

WRITE_APPEND -指定可以将行追加到现有表中。

WRITE_EMPTY -指定输出表必须为空。

WRITE_TRUNCATE -指定写应替换表。

它们都不适合UPSERT操作目的。

我正在将订单Json文件导入Google Storage,并希望将其加载到BigQuery中。逻辑提示,某些记录将是新记录,而其他记录已从以前的装载中获取并且需要更新(例如,更新订单状态(新/处于保留状态/已发送/退款等...)

我正在使用Airflow,但我的问题很普遍:

update_bigquery = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_orders_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)
Run Code Online (Sandbox Code Playgroud)

此代码使用表示WRITE_TRUNCATE这意味着删除整个表并加载请求的文件。

我如何修改它以提供支持UPSERT

我唯一的选择是查询表搜索以找到json中出现的现有订单LOAD吗?删除它们,然后执行?

google-bigquery

2
推荐指数
1
解决办法
910
查看次数

处理来自 python 请求的异常

我的函数是用线程执行的:

def getdata(self, page, ...):
    tries = 10
    for n in range(tries):
        try:
            ...
            datarALL = []
            url = 'http://website/...'.format(...)
            responsedata = requests.get(url, data=data, headers=self.hed, verify=False)
            responsedata.raise_for_status()
            if responsedata.status_code == 200:  # 200 for successful call
                ...
                    if ...
                        break   
        except (ChunkedEncodingError, requests.exceptions.HTTPError) as e:
            print ("page #{0} run #{1} failed. Returned status code {2}. Reason: {3}. Msg: {4}. Retry.".format(page, n, responsedata.status_code, responsedata.reason, sys.exc_info()[0]))
            if n == tries - 1:
                print ("page {0} could not be imported. Max retried reached.".format(page)) …
Run Code Online (Sandbox Code Playgroud)

python python-requests

2
推荐指数
1
解决办法
1万
查看次数

如何使用 BigQuery 获取 15 分钟前的时间戳?

找到了,但这不能在标准 SQL 上运行

我试图做:

WHERE datew > DATE_SUB(CURRENT_TIMESTAMP(),INTERVAL 5 DAY )
Run Code Online (Sandbox Code Playgroud)

但这不起作用。根据文档DATE_SUB仅支持

DAY
WEEK. Equivalent to 7 DAYs.
MONTH
QUARTER
YEAR
Run Code Online (Sandbox Code Playgroud)

如何time stamp - 15 minutes使用 BigQuery获取标准 SQL 的最新信息?

sql google-bigquery

1
推荐指数
1
解决办法
2602
查看次数

sys.stdout显示我没写的东西

我在做(Python 3.5):

sys.stdout.write('NoNewRecords')
Run Code Online (Sandbox Code Playgroud)

它告诉我:

NoNewRecords12
Run Code Online (Sandbox Code Playgroud)

这是为什么?

我用它来推动价值Apache-Airflow.当我推动时,我需要具有确切的值.

>>> import sys
>>> sys.stdout.write('NoNewRecords')
NoNewRecords12
Run Code Online (Sandbox Code Playgroud)

python

0
推荐指数
1
解决办法
47
查看次数