我有这个讨厌的错误:
Traceback (most recent call last):
File "/home/ubuntu/.local/lib/python3.5/site-packages/urllib3/connectionpool.py", line 600, in urlopen
chunked=chunked)
File "/home/ubuntu/.local/lib/python3.5/site-packages/urllib3/connectionpool.py", line 384, in _make_request
six.raise_from(e, None)
File "<string>", line 2, in raise_from
File "/home/ubuntu/.local/lib/python3.5/site-packages/urllib3/connectionpool.py", line 380, in _make_request
httplib_response = conn.getresponse()
File "/usr/lib/python3.5/http/client.py", line 1197, in getresponse
response.begin()
File "/usr/lib/python3.5/http/client.py", line 297, in begin
version, status, reason = self._read_status()
File "/usr/lib/python3.5/http/client.py", line 258, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/lib/python3.5/socket.py", line 575, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [Errno 104] Connection reset by …Run Code Online (Sandbox Code Playgroud) 我在气流中有一个操作员:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
现在,我需要运行的实际查询有 24 行。我想将它保存在一个文件中,并为操作员提供 SQL 文件的路径。操作员支持这一点,但我不确定如何处理需要 SQL 的参数。
建议?
编辑:这是我的代码:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
sql = '{{ templates_dict.sql }}',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
Run Code Online (Sandbox Code Playgroud)
这给出:
jinja2.exceptions.UndefinedError: 'templates_dict' 未定义
我想将Json的压缩gzip上传到Google Storage。
我有以下代码:
import datalab.storage as storage
import gzip
path = prefix + '/orders_newline.json.gz'
storage.Bucket('orders').item(path).write_to(gzip.compress(bytes(create_jsonlines(source)),encoding='utf8'), 'application/json')
Run Code Online (Sandbox Code Playgroud)
该create_jsonlines(source)是返回的Json换行符分隔的功能。
运行这段代码可以得到:
TypeError: string argument without an encoding
Run Code Online (Sandbox Code Playgroud)
在Python文档说,格式是:bytes([source[, encoding[, errors]]])我不知道我把它理解为不存在如何使用它的例子。
我也尝试过
bytes([(create_jsonlines(source))[,encoding='utf8']])
Run Code Online (Sandbox Code Playgroud)
这给出了:
SyntaxError: invalid syntax
Run Code Online (Sandbox Code Playgroud)
我正在运行Python 3.5
python google-cloud-storage google-cloud-platform google-cloud-datalab
该文档指定了集成说明
我想要的是,每次调度程序停止工作时,它将自己重新启动。通常,我使用手动启动它,airflow scheduler -D但有时在不可用时停止。
阅读文档时,我不确定配置。
在GitHub上包含以下文件:
airflow
airflow-scheduler.service
airflow.conf
Run Code Online (Sandbox Code Playgroud)
我正在运行Ubuntu 16.04
气流安装在:
home/ubuntu/airflow
Run Code Online (Sandbox Code Playgroud)
我有以下路径:
etc/systemd
Run Code Online (Sandbox Code Playgroud)
该文档说:
将它们复制(或链接)到/ usr / lib / systemd / system
将airflow.conf复制到/etc/tmpfiles.d/
什么是tmpfiles.d?
什么是# AIRFLOW_CONFIG=在气流文件?
或者换句话说...关于如何做到这一点的更多“脚踏实地”指南?
我有以下内容:
EXEC_DATE1 = '{{ macros.ds_add(ds, 1) }}'
EXEC_DATE2 = '{{ execution_date }}'
Run Code Online (Sandbox Code Playgroud)
我想创建如下所示的路径变量:
path1 = EXEC_DATE1 + '/' + HH:MM (of EXEC_DATE1)
path2 = EXEC_DATE2 + '/' + HH:MM (of EXEC_DATE2)
Run Code Online (Sandbox Code Playgroud)
最终它应该是这样的:
2018-09-16/10:41
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?
我试过:
EXEC_DATE = '{{ execution_date }}'
EXEC_DATE = EXEC_DATE.strftime('%Y-%m-%d/%H:%M')
Run Code Online (Sandbox Code Playgroud)
但它给出了:
'str' object has no attribute 'strftime'
Run Code Online (Sandbox Code Playgroud)
编辑: 我的代码:
EXEC_TIMESTAMP_PATH = "{{ execution_date.strftime('%Y-%m-%d/%H:%M') }}"
EXEC_DATE = "{{ execution_date.strftime('%H:%M') }}"
EXEC_TIME = "{{ mexecution_date.strftime('%Y-%m-%d') }}"
task3_op= BashOperator(
task_id='task3',
params={'EXEC_DATE':EXEC_DATE, 'EXEC_TIME':EXEC_TIME},
bash_command="""python3 script.py '{{ var.value.task3_variable }}' '{{ params.EXEC_DATE }}' …Run Code Online (Sandbox Code Playgroud) 我有一个IAM具有角色的用户:BigQuery Data Editor
在我的数据集中,我确实Share dataset添加了具有Can Edit权限的用户。
但是,当我运行访问 BigQuery 的脚本时,出现错误 403
当我添加到我的IAM用户角色BigQuery User脚本工作。
脚本仅从SELECT该数据集中的表运行查询。
我不明白为什么我必须授予BigQuery User它才能起作用。
根据文档https://cloud.google.com/bigquery/docs/access-control
基本原理:dataEditor 角色通过为数据集中的表颁发创建、更新、删除权限来扩展 bigquery.dataViewer
roles/bigquery.dataViewerhas bigquery.tables.getDatawhich 获取表数据
我在这里做错了什么?
这是我的操作员:
bigquery_check_op = BigQueryOperator(
task_id='bigquery_check',
bql=SQL_QUERY,
use_legacy_sql = False,
bigquery_conn_id=CONNECTION_ID,
trigger_rule='all_success',
xcom_push=True,
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
当我检查 UI 中的渲染页面时。那里什么也没有出现。当我在控制台中运行 SQL 时,它返回1400正确的值。为什么运营商不推XCOM?
我无法使用BigQueryValueCheckOperator。该运算符被设计为在值检查时失败。我不想让任何事情失败。我只是想根据查询的返回值对代码进行分支。
我有以下代码:
tries = 10
for n in range(tries):
try:
....
responsedata = requests.get(url, data=data, headers=self.hed, verify=False)
responsedata.raise_for_status()
..
if .... :
break #exit loop condition
except (ChunkedEncodingError, requests.exceptions.HTTPError) as e:
print ("page #{0} run #{1} failed. Returned status code {2}. Msg: {3}. Retry.".format(page, n, responsedata.status_code, sys.exc_info()[0]))
if n == tries - 1:
raise e # exit the process
Run Code Online (Sandbox Code Playgroud)
我看到的照片是:
page #53 run #0 failed. Returned status code 502. Msg: <class 'requests.exceptions.HTTPError'>. Retry.
page #1 run #1 failed. Returned status code …Run Code Online (Sandbox Code Playgroud) 我读过这篇How to use airflow xcoms with MySqlOperator,虽然它有一个类似的标题,但它并没有真正解决我的问题。
我有以下代码:
def branch_func_is_new_records(**kwargs):
ti = kwargs['ti']
xcom = ti.xcom_pull(task_ids='query_get_max_order_id')
string_to_print = 'Value in xcom is: {}'.format(xcom)
logging.info(string_to_print)
if int(xcom) > int(LAST_IMPORTED_ORDER_ID)
return 'import_orders'
else:
return 'skip_operation'
query_get_max_order_id = 'SELECT COALESCE(max(orders_id),0) FROM warehouse.orders where orders_id>1 limit 10'
get_max_order_id = MySqlOperator(
task_id='query_get_max_order_id',
sql= query_get_max_order_id,
mysql_conn_id=MyCon,
xcom_push=True,
dag=dag)
branch_op_is_new_records = BranchPythonOperator(
task_id='branch_operation_is_new_records',
provide_context=True,
python_callable=branch_func_is_new_records,
dag=dag)
get_max_order_id >> branch_op_is_new_records >> import_orders
branch_op_is_new_records >> skip_operation
Run Code Online (Sandbox Code Playgroud)
MySqlOperator根据选择下一个任务的编号返回一个编号BranchPythonOperator。保证MySqlOperator返回的值大于0。
我的问题是,当我转到“我什么也没看到”时,“在用户界面上”没有推送XCOM任何 …
BigQuery支持以下策略:
WRITE_APPEND -指定可以将行追加到现有表中。
WRITE_EMPTY -指定输出表必须为空。
WRITE_TRUNCATE -指定写应替换表。
它们都不适合UPSERT操作目的。
我正在将订单Json文件导入Google Storage,并希望将其加载到BigQuery中。逻辑提示,某些记录将是新记录,而其他记录已从以前的装载中获取并且需要更新(例如,更新订单状态(新/处于保留状态/已发送/退款等...)
我正在使用Airflow,但我的问题很普遍:
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_orders_to_BigQuery',
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='NEWLINE_DELIMITED_JSON',
source_objects=[gcs_export_uri_template],
schema_fields=dc(),
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
skip_leading_rows = 1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
Run Code Online (Sandbox Code Playgroud)
此代码使用表示WRITE_TRUNCATE这意味着删除整个表并加载请求的文件。
我如何修改它以提供支持UPSERT?
我唯一的选择是查询表搜索以找到json中出现的现有订单LOAD吗?删除它们,然后执行?
我的函数是用线程执行的:
def getdata(self, page, ...):
tries = 10
for n in range(tries):
try:
...
datarALL = []
url = 'http://website/...'.format(...)
responsedata = requests.get(url, data=data, headers=self.hed, verify=False)
responsedata.raise_for_status()
if responsedata.status_code == 200: # 200 for successful call
...
if ...
break
except (ChunkedEncodingError, requests.exceptions.HTTPError) as e:
print ("page #{0} run #{1} failed. Returned status code {2}. Reason: {3}. Msg: {4}. Retry.".format(page, n, responsedata.status_code, responsedata.reason, sys.exc_info()[0]))
if n == tries - 1:
print ("page {0} could not be imported. Max retried reached.".format(page)) …Run Code Online (Sandbox Code Playgroud) 找到了,但这不能在标准 SQL 上运行
我试图做:
WHERE datew > DATE_SUB(CURRENT_TIMESTAMP(),INTERVAL 5 DAY )
Run Code Online (Sandbox Code Playgroud)
但这不起作用。根据文档DATE_SUB仅支持
DAY
WEEK. Equivalent to 7 DAYs.
MONTH
QUARTER
YEAR
Run Code Online (Sandbox Code Playgroud)
如何time stamp - 15 minutes使用 BigQuery获取标准 SQL 的最新信息?
我在做(Python 3.5):
sys.stdout.write('NoNewRecords')
Run Code Online (Sandbox Code Playgroud)
它告诉我:
NoNewRecords12
Run Code Online (Sandbox Code Playgroud)
这是为什么?
我用它来推动价值Apache-Airflow.当我推动时,我需要具有确切的值.
>>> import sys
>>> sys.stdout.write('NoNewRecords')
NoNewRecords12
Run Code Online (Sandbox Code Playgroud)