我有一个 python 包,如下所示:
\n\npackage/\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 __init__.py\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 PyMySQL-0.7.6-py2.7.egg\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 pymysql\n\xe2\x94\x9c\xe2\x94\x80\xe2\x94\x80 PyMySQL-0.7.x.pth \n\xe2\x94\x94\xe2\x94\x80\xe2\x94\x80 tests.py\nRun Code Online (Sandbox Code Playgroud)\n\n文件夹结构无法更改,因为它来自第三方库。
\n\n.pth 文件的内容是
\n\nimport sys; sys.__plen = len(sys.path)\n./PyMySQL-0.7.6-py2.7.egg\nimport sys; new=sys.path[sys.__plen:]; del sys.path[sys.__plen:]; p=getattr(sys,'__egginsert',0); sys.path[p:p]=new; sys.__egginsert = p+len(new)\nRun Code Online (Sandbox Code Playgroud)\n\n将pymysql包含在tests.py中的最佳方法是什么
\n\n我显然无法使用,from PyMySQL-0.7.6-py2.7.egg因为文件夹名称包含点。
PS 绝对路径未知,因为此代码应该部署到 AWS lambda
\n我有以下DAG和两个SSHExecuteOperator任务.第一个任务执行返回参数的存储过程.第二个任务需要此参数作为输入.
请问如何从task1中推送的XCom中提取值,以便在task2中使用它?
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['my@email.com'],
'email_on_failure': True,
'retries': 0
}
#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin
DataQualitySSHHook = Variable.get('DataQualitySSHHook')
print('Connecting to: ' + DataQualitySSHHook)
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook)
sshHookEtl.no_host_key_check = True
#create dag
dag = DAG(
'ed_data_quality_test-v0.0.3', #update version …Run Code Online (Sandbox Code Playgroud) 我正在使用 Python 包 Flask、SQLAlchemy、Graphene 和 Graphene-SQLAlchemy 构建 GraphQL API。我遵循了SQLAlchemy + Flask 教程。我能够执行查询和更改来创建记录。现在我想知道更新现有记录的最佳方法是什么。
这是我当前的脚本schema.py:
from graphene_sqlalchemy import SQLAlchemyObjectType
from database.batch import BatchOwner as BatchOwnerModel
import api_utils # Custom methods to create records in database
import graphene
class BatchOwner(SQLAlchemyObjectType):
"""Batch owners."""
class Meta:
model = BatchOwnerModel
interfaces = (graphene.relay.Node,)
class CreateBatchOwner(graphene.Mutation):
"""Create batch owner."""
class Arguments:
name = graphene.String()
# Class attributes
ok = graphene.Boolean()
batch_owner = graphene.Field(lambda: BatchOwner)
def mutate(self, info, name):
record = {'name': name}
api_utils.create('BatchOwner', …Run Code Online (Sandbox Code Playgroud) 我有一组Python脚本,它们以嵌套方式调用函数。对于这些功能中的每一个,我都有一个尝试,除了语句以捕获每个异常并打印它们。我想发送一封电子邮件警报,其中包含执行期间遇到的异常的完整序列。例:
import sys
def SendAlert(ErrorMessage):
try:
#send email alert with error message
#[...]
except:
print(str(sys.exc_info()))
return(sys.exc_info())
def ParentFunction():
try:
#call ChildFunction
ChildResult = ChildFunction()
#do stuff with ChildResult
#[...]
return ParentResult
except:
ErrorMessage = str(sys.exc_info())
print(ErrorMessage)
SendAlert(ErrorMessage)
def ChildFunction():
try:
#do stuff
#[...]
return ChildResult
except:
print(str(sys.exc_info()))
return(sys.exc_info())
#main
if __name__ == '__main__':
Result = ParentFunction()
Run Code Online (Sandbox Code Playgroud)
上面的代码在发生错误的情况下将表现如下,ChildFunction其中错误是嵌套最多的函数:
ChildFunction 遇到异常,它将打印它并将错误消息返回给 ParentFunctionParentFunction将失败,因为ChildResult包含错误消息而不是有效值ParentFunction 将触发和异常并在电子邮件警报中发送自己的错误消息除了来自的错误消息外ParentFunction,我还希望电子邮件警报中包含来自的错误消息ChildFunction。请注意,我想避免在except语句中将ChildResult变量传递给SendAlert函数,ParentFunction因为在现实生活中,我的程序有很多嵌套函数,这意味着将每个函数的结果变量传递给except语句。
您将如何实现?有没有办法访问整个程序触发的完整错误序列? …
我目前为我的项目定义了以下突变:
我的类PlanetAttribute 用于定义用作突变输入的石墨烯字段
class PlanetAttribute:
name = graphene.String(required=True, description="Name of the planet.")
rotation_period = graphene.String(default_value="unknown", description="Rotation period of the planet.")
orbital_period = graphene.String(default_value="unknown", description="Orbital period of the planet.")
diameter = graphene.String(default_value="unknown", description="Diameter of the planet.")
climate = graphene.String(default_value="unknown", description="Climate period of the planet.")
gravity = graphene.String(default_value="unknown", description="Gravity of the planet.")
terrain = graphene.String(default_value="unknown", description="Terrain of the planet.")
surface_water = graphene.String(default_value="unknown", description="Surface water of the planet.")
population = graphene.String(default_value="unknown", description="Population of the planet.")
url = graphene.String(default_value="unknown", description="URL of the planet …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用JayDeBeApi包在Python 3 中连接到Teradata数据库。这是使用Flask和Flask-Restplus的 API 项目的一部分
这是重现该问题的最小工作示例。在终端窗口中,键入以下命令来设置您的工作站:
# Install JVM
sudo apt-get install default-jre
# Create Python virtual environment
sudo apt-get install python3-venv
python3 -m venv jdbc
source jdbc/bin/activate
# Install Python packages in virtual environment
pip3 install --upgrade pip
pip3 install jaydebeapi
pip3 install flask
Run Code Online (Sandbox Code Playgroud)
使用以下内容创建文件 app.py:
from flask import Flask
import jaydebeapi
app = Flask(__name__)
def get_jdbc_connection():
connection = jaydebeapi.connect(
'com.teradata.jdbc.TeraDriver',
'jdbc:teradata://edw-dev.company.org',
{'user': 'LOGIN', 'password': 'PASSWORD', 'tmode': 'TERA', …Run Code Online (Sandbox Code Playgroud) 我有一个使用 Python 脚本和 Pyodbc 模块调用的存储过程。代码如下所示:
import pyodbc
pyodbc.pooling=False
oConnexion = pyodbc.connect("driver={Teradata};dbcname=myServer;DefaultDatabase=myDB;uid=myUser;pwd=myPassword;charset=utf8;", autocommit=True)
oConnexion.setdecoding(pyodbc.SQL_CHAR, encoding='utf-8')
oConnexion.setdecoding(pyodbc.SQL_WCHAR, encoding='utf-8')
oConnexion.setencoding(encoding='utf-8')
oCursor = oConnexion.cursor()
oQueryRegisterBatch = "CALL DEV_AUDIT.SP_AUDIT_BATCH('ED_DATA_QUALITY_MANUAL', 'REGISTER', '1900-01-01 00:00:00.000000', '2999-12-31 00:00:00.000000');"
oCursor.execute(oQueryRegisterBatch)
for row in oCursor:
print (row)
Run Code Online (Sandbox Code Playgroud)
存储过程创建一个新记录并返回记录 ID (BATCH_KEY)。当我在 Teradata 中执行存储过程时,它会正确返回 BATCH_KEY,但我无法在 Python 中捕获它。我收到以下错误消息而不是值:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
pyodbc.ProgrammingError: No results. Previous SQL was not a query.
Run Code Online (Sandbox Code Playgroud)
我可以在调用存储过程后通过查询表来检索 BATCH_KEY,但我想避免。您能否建议如何捕获存储过程的输出?
谢谢
您能否描述一下如何在Flask Restplus自动生成的文档中指明基本URL?
我正在运行以下代码,但没有显示在swagger UI中:
from flask import Flask
from flask_restplus import Api
app = Flask(__name__)
api = Api(app,
title='Data Quality Framework API',
version='v0.1',
doc='/data_quality_framework/api/documentation',
contact='me@xxx.com',
base_url='/test')
Run Code Online (Sandbox Code Playgroud)
我的 Vue.js 应用程序的 Vuex 存储正在不断增长,并且由于其中有很多常量而变得有点混乱。我想将这些常量拆分为单独的文件并将它们导入到我的 Vuex store 中store.js。我是 JavaScript 新手,所以我想知道:
store.js?这样做的确切语法是什么?这是我的当前内容store.js
import Vue from 'vue';
import Vuex from 'vuex';
Vue.use(Vuex);
export const store = new Vuex.Store({
state: {
graphqlUrl: 'https://localhost/listof/api/v1/graphql',
errorObject: {
flag: false,
message: ''
},
// Data types queries
queryGetAllDataTypes: `query getAllDataTypes {
allSysDataTypes(orderBy: NAME_ASC) {
nodes {
id
name
}
}
}`,
// Data for linked list & attributes drodpdown in attribute form
// Response labels must be formatted according …Run Code Online (Sandbox Code Playgroud) 我有以下扁平字典,其中包含每个项目的一个条目,每个项目都包含一个父级和子级属性。
{
'a': {
parent: None,
children: ['b', 'c', 'd']
},
'b': {
parent: 'a',
children: ['e', 'f', 'g']
},
'c': {
parent: 'a',
children: []
},
'd': {
parent: 'a',
children: []
},
'e': {
parent: 'b',
children: []
},
'f': {
parent: 'b',
children: ['h']
},
'g': {
parent: 'b',
children: []
},
'h': {
parent: 'f',
children: []
},
}
Run Code Online (Sandbox Code Playgroud)
我怎样才能把它变成一个看起来像这样的嵌套字典?
{
'a': {
'b': {
'e': {},
'f': {
'h':
}
'g': {}
},
'c': {}, …Run Code Online (Sandbox Code Playgroud) python ×7
flask ×2
sqlalchemy ×2
teradata ×2
airflow ×1
aws-lambda ×1
dictionary ×1
graphql ×1
import ×1
javascript ×1
jaydebeapi ×1
jdbc ×1
nested ×1
output ×1
pymysql ×1
pyodbc ×1
python-2.7 ×1
ssh ×1
swagger-ui ×1
vue.js ×1
vuex ×1