我正在寻找一种简单的python方法来比较SQLAlchemy中的列类型和基类型.例如,如果我的列类型是任何长度的VARCHAR,我想将其作为字符串读取.
我可以读取列类型没关系,但我不确定一种简单的方法来验证它的基本类型...如果我可以使用"if isinstance(mycolumn,int)"这样的东西会很好 - 但我是新的到python,不知道这将如何工作.
这是我到目前为止所拥有的:
from sqlalchemy import MetaData
from sqlalchemy import create_engine, Column, Table
engine = create_engine('mysql+mysqldb://user:pass@localhost:3306/mydb', pool_recycle=3600)
meta = MetaData()
meta.bind = engine
meta.reflect()
datatable = meta.tables['my_data_table']
[c.type for c in datatable.columns]
Run Code Online (Sandbox Code Playgroud)
输出:
[INTEGER(display_width=11), DATE(), VARCHAR(length=127), DOUBLE(precision=None, scale=None, asdecimal=True)]
Run Code Online (Sandbox Code Playgroud)
我的最终目的是双重的,首先是因为当我将它加载到我的jQuery jqGrid中时,我希望根据类型格式化输出.第二,我正在慢慢地将非规范化数据表转换为规范化结构,并且希望确保我保持我的类型一致 - (以确保我在前一个表中的数字存储为数字而不是字符串...... )
只需使用python_type
所有AQLAlchemy类型中可用的属性,例如:
[c.type.python_type for c in datatable.columns]
Run Code Online (Sandbox Code Playgroud)
我在使用默认 sql 类型动态创建 SQL 表的问题上苦苦挣扎。我最终得到了以下方便的函数,用于我所有的 python 类型到 sql 类型的转换需求。从 sql 类型到 python 类型是微不足道的,将在下一节中解释。
import sqlalchemy
import numpy as np
import datetime
import decimal
_type_py2sql_dict = {
int: sqlalchemy.sql.sqltypes.BigInteger,
str: sqlalchemy.sql.sqltypes.Unicode,
float: sqlalchemy.sql.sqltypes.Float,
decimal.Decimal: sqlalchemy.sql.sqltypes.Numeric,
datetime.datetime: sqlalchemy.sql.sqltypes.DateTime,
bytes: sqlalchemy.sql.sqltypes.LargeBinary,
bool: sqlalchemy.sql.sqltypes.Boolean,
datetime.date: sqlalchemy.sql.sqltypes.Date,
datetime.time: sqlalchemy.sql.sqltypes.Time,
datetime.timedelta: sqlalchemy.sql.sqltypes.Interval,
list: sqlalchemy.sql.sqltypes.ARRAY,
dict: sqlalchemy.sql.sqltypes.JSON
}
def type_py2sql(pytype):
'''Return the closest sql type for a given python type'''
if pytype in _type_py2sql_dict:
return _type_py2sql_dict[pytype]
else:
raise NotImplementedError(
"You may add custom `sqltype` to `"+str(pytype)+"` assignment in `_type_py2sql_dict`.")
def type_np2py(dtype=None, arr=None):
'''Return the closest python type for a given numpy dtype'''
if ((dtype is None and arr is None) or
(dtype is not None and arr is not None)):
raise ValueError(
"Provide either keyword argument `dtype` or `arr`: a numpy dtype or a numpy array.")
if dtype is None:
dtype = arr.dtype
#1) Make a single-entry numpy array of the same dtype
#2) force the array into a python 'object' dtype
#3) the array entry should now be the closest python type
single_entry = np.empty([1], dtype=dtype).astype(object)
return type(single_entry[0])
def type_np2sql(dtype=None, arr=None):
'''Return the closest sql type for a given numpy dtype'''
return type_py2sql(type_np2py(dtype=dtype, arr=arr))
Run Code Online (Sandbox Code Playgroud)
一些用例:
>>> sqlalchemy.Column(type_py2sql(int))
Column(None, BigInteger(), table=None)
>>> type_py2sql(type('hello'))
sqlalchemy.sql.sqltypes.Unicode
>>> type_np2sql(arr=np.array([1.,2.,3.]))
sqlalchemy.sql.sqltypes.Float
Run Code Online (Sandbox Code Playgroud)
我所做的是将所有 sql 类型映射到它们等效的 python 类型。然后我打印了哪种 python 类型对应于哪些 sql-types,并为每个 python 类型选择了最好的 sql-type。这是我用来生成此映射的代码:
#********** SQL to Python: one to one **********
type_sql2py_dict = {}
for key in sqlalchemy.types.__dict__['__all__']:
sqltype = getattr(sqlalchemy.types, key)
if 'python_type' in dir(sqltype) and not sqltype.__name__.startswith('Type'):
try:
typeinst = sqltype()
except TypeError as e: #List/array wants inner-type
typeinst = sqltype(None)
try:
type_sql2py_dict[sqltype] = typeinst.python_type
except NotImplementedError:
pass
#********** Python to SQL: one to many **********
type_py2sql_dict = {}
for key, val in type_sql2py_dict.items():
if not val in type_py2sql_dict:
type_py2sql_dict[val] = [key]
else:
type_py2sql_dict[val].append(key)
Run Code Online (Sandbox Code Playgroud)
这是type_py2sql_dict
sqlalchemy 1.3.5 版下的输出:
{int: [sqlalchemy.sql.sqltypes.INTEGER,
sqlalchemy.sql.sqltypes.BIGINT,
sqlalchemy.sql.sqltypes.SMALLINT,
sqlalchemy.sql.sqltypes.Integer,
sqlalchemy.sql.sqltypes.SmallInteger,
sqlalchemy.sql.sqltypes.BigInteger],
str: [sqlalchemy.sql.sqltypes.CHAR,
sqlalchemy.sql.sqltypes.VARCHAR,
sqlalchemy.sql.sqltypes.NCHAR,
sqlalchemy.sql.sqltypes.NVARCHAR,
sqlalchemy.sql.sqltypes.TEXT,
sqlalchemy.sql.sqltypes.Text,
sqlalchemy.sql.sqltypes.CLOB,
sqlalchemy.sql.sqltypes.String,
sqlalchemy.sql.sqltypes.Unicode,
sqlalchemy.sql.sqltypes.UnicodeText,
sqlalchemy.sql.sqltypes.Enum],
float: [sqlalchemy.sql.sqltypes.FLOAT,
sqlalchemy.sql.sqltypes.REAL,
sqlalchemy.sql.sqltypes.Float],
decimal.Decimal: [sqlalchemy.sql.sqltypes.NUMERIC,
sqlalchemy.sql.sqltypes.DECIMAL,
sqlalchemy.sql.sqltypes.Numeric],
datetime.datetime: [sqlalchemy.sql.sqltypes.TIMESTAMP,
sqlalchemy.sql.sqltypes.DATETIME,
sqlalchemy.sql.sqltypes.DateTime],
bytes: [sqlalchemy.sql.sqltypes.BLOB,
sqlalchemy.sql.sqltypes.BINARY,
sqlalchemy.sql.sqltypes.VARBINARY,
sqlalchemy.sql.sqltypes.LargeBinary,
sqlalchemy.sql.sqltypes.Binary],
bool: [sqlalchemy.sql.sqltypes.BOOLEAN, sqlalchemy.sql.sqltypes.Boolean],
datetime.date: [sqlalchemy.sql.sqltypes.DATE, sqlalchemy.sql.sqltypes.Date],
datetime.time: [sqlalchemy.sql.sqltypes.TIME, sqlalchemy.sql.sqltypes.Time],
datetime.timedelta: [sqlalchemy.sql.sqltypes.Interval],
list: [sqlalchemy.sql.sqltypes.ARRAY],
dict: [sqlalchemy.sql.sqltypes.JSON]}
Run Code Online (Sandbox Code Playgroud)
一种解决方案是手动进行转换 - 例如,这有效:
def convert(self, saType):
type = "Unknown"
if isinstance(saType,sqlalchemy.types.INTEGER):
type = "Integer"
elif isinstance(saType,sqlalchemy.types.VARCHAR):
type = "String"
elif isinstance(saType,sqlalchemy.types.DATE):
type = "Date"
elif isinstance(saType,sqlalchemy.dialects.mysql.base._FloatType):
type = "Double"
return type
Run Code Online (Sandbox Code Playgroud)
不确定这是否是一种正常的 Python 做事方式......我仍然像一个 Java 程序员一样思考。
归档时间: |
|
查看次数: |
7665 次 |
最近记录: |