Zel*_*lid 169 python json sqlalchemy
Django有一些从DB到JSON格式返回的ORM模型的自动序列化.
如何将SQLAlchemy查询结果序列化为JSON格式?
我试过,jsonpickle.encode但它编码查询对象本身.我试过了,json.dumps(items)但它回来了
TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable
Run Code Online (Sandbox Code Playgroud)
是否真的很难将SQLAlchemy ORM对象序列化为JSON/XML?它没有默认的序列化器吗?现在序列化ORM查询结果是非常常见的任务.
我需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示.
需要在javascript datagird中使用SQLAlchemy对象的JSON/XML格式的查询结果(JQGrid http://www.trirand.com/blog/)
cha*_*lax 241
您可以将对象输出为dict:
class User:
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
Run Code Online (Sandbox Code Playgroud)
然后使用User.as_dict()来序列化您的对象.
如将sqlalchemy行对象转换为python dict中所述
小智 116
你可以使用这样的东西:
from sqlalchemy.ext.declarative import DeclarativeMeta
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
data = obj.__getattribute__(field)
try:
json.dumps(data) # this will fail on non-encodable values, like other classes
fields[field] = data
except TypeError:
fields[field] = None
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
Run Code Online (Sandbox Code Playgroud)
然后使用以下命令转换为JSON:
c = YourAlchemyClass()
print json.dumps(c, cls=AlchemyEncoder)
Run Code Online (Sandbox Code Playgroud)
它将忽略不可编码的字段(将它们设置为"无").
它不会自动扩展关系(因为这可能会导致自我引用,并永远循环).
但是,如果你宁愿循环,你可以使用:
from sqlalchemy.ext.declarative import DeclarativeMeta
def new_alchemy_encoder():
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# an SQLAlchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
fields[field] = obj.__getattribute__(field)
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
Run Code Online (Sandbox Code Playgroud)
然后使用以下方法编码对象
print json.dumps(e, cls=new_alchemy_encoder(), check_circular=False)
Run Code Online (Sandbox Code Playgroud)
这将编码所有孩子,他们所有的孩子,以及他们所有的孩子...基本上可能编码整个数据库.当它达到之前编码的内容时,它会将其编码为"无".
另一种可能更好的替代方法是能够指定要扩展的字段:
def new_alchemy_encoder(revisit_self = False, fields_to_expand = []):
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata']:
val = obj.__getattribute__(field)
# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or (isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field] = None
continue
fields[field] = val
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
Run Code Online (Sandbox Code Playgroud)
你现在可以用以下方式调用它:
print json.dumps(e, cls=new_alchemy_encoder(False, ['parents']), check_circular=False)
Run Code Online (Sandbox Code Playgroud)
例如,仅扩展名为"parents"的SQLAlchemy字段.
Nic*_*ins 50
你可以将RowProxy转换为这样的dict:
d = dict(row.items())
Run Code Online (Sandbox Code Playgroud)
然后将其序列化为JSON(您必须为datetime值等事物指定编码器)如果您只想要一条记录(而不是相关记录的完整层次结构),那就不难了.
json.dumps([(dict(row.items())) for row in rs])
Run Code Online (Sandbox Code Playgroud)
Yas*_*ush 41
我建议使用最近浮出水面的图书馆棉花糖.它允许您创建序列化程序来表示模型实例,并支持关系和嵌套对象.
看看他们的SQLAlchemy示例.
kol*_*pto 13
Flask-JsonTools包为您的模型提供了JsonSerializableBase Base类的实现.
用法:
from sqlalchemy.ext.declarative import declarative_base
from flask.ext.jsontools import JsonSerializableBase
Base = declarative_base(cls=(JsonSerializableBase,))
class User(Base):
#...
Run Code Online (Sandbox Code Playgroud)
现在这个User模型是神奇的可序列化的.
如果你的框架不是Flask,你可以抓住代码
Tjo*_*rie 13
出于安全原因,您永远不应该返回所有模型的字段.我更愿意有选择地选择它们.
Flask的json编码现在支持UUID,日期时间和关系(query以及query_class为flask_sqlalchemy db.Model类添加的).我更新了编码器如下:
应用程序/ json_encoder.py
from sqlalchemy.ext.declarative import DeclarativeMeta
from flask import json
class AlchemyEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o.__class__, DeclarativeMeta):
data = {}
fields = o.__json__() if hasattr(o, '__json__') else dir(o)
for field in [f for f in fields if not f.startswith('_') and f not in ['metadata', 'query', 'query_class']]:
value = o.__getattribute__(field)
try:
json.dumps(value)
data[field] = value
except TypeError:
data[field] = None
return data
return json.JSONEncoder.default(self, o)
Run Code Online (Sandbox Code Playgroud)
app/__init__.py
# json encoding
from app.json_encoder import AlchemyEncoder
app.json_encoder = AlchemyEncoder
Run Code Online (Sandbox Code Playgroud)
有了这个,我可以选择添加一个__json__属性,返回我想编码的字段列表:
app/models.py
class Queue(db.Model):
id = db.Column(db.Integer, primary_key=True)
song_id = db.Column(db.Integer, db.ForeignKey('song.id'), unique=True, nullable=False)
song = db.relationship('Song', lazy='joined')
type = db.Column(db.String(20), server_default=u'audio/mpeg')
src = db.Column(db.String(255), nullable=False)
created_at = db.Column(db.DateTime, server_default=db.func.now())
updated_at = db.Column(db.DateTime, server_default=db.func.now(), onupdate=db.func.now())
def __init__(self, song):
self.song = song
self.src = song.full_path
def __json__(self):
return ['song', 'src', 'type', 'created_at']
Run Code Online (Sandbox Code Playgroud)
我将@jsonapi添加到我的视图中,返回结果列表,然后我的输出如下:
[
{
"created_at": "Thu, 23 Jul 2015 11:36:53 GMT",
"song":
{
"full_path": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
"id": 2,
"path_name": "Audioslave/Audioslave [2002]/1 Cochise.mp3"
},
"src": "/static/music/Audioslave/Audioslave [2002]/1 Cochise.mp3",
"type": "audio/mpeg"
}
]
Run Code Online (Sandbox Code Playgroud)
phi*_*ico 10
您可以使用SqlAlchemy的内省作为:
mysql = SQLAlchemy()
from sqlalchemy import inspect
class Contacts(mysql.Model):
__tablename__ = 'CONTACTS'
id = mysql.Column(mysql.Integer, primary_key=True)
first_name = mysql.Column(mysql.String(128), nullable=False)
last_name = mysql.Column(mysql.String(128), nullable=False)
phone = mysql.Column(mysql.String(128), nullable=False)
email = mysql.Column(mysql.String(128), nullable=False)
street = mysql.Column(mysql.String(128), nullable=False)
zip_code = mysql.Column(mysql.String(128), nullable=False)
city = mysql.Column(mysql.String(128), nullable=False)
def toDict(self):
return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }
@app.route('/contacts',methods=['GET'])
def getContacts():
contacts = Contacts.query.all()
contactsArr = []
for contact in contacts:
contactsArr.append(contact.toDict())
return jsonify(contactsArr)
@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
contact = Contacts.query.get(id)
return jsonify(contact.toDict())
Run Code Online (Sandbox Code Playgroud)
从这里的答案中获得灵感: 将sqlalchemy行对象转换为python dict
tom*_*tom 10
from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
@dataclass
class User(db.Model):
id: int
email: str
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
email = db.Column(db.String(200), unique=True)
@app.route('/users/')
def users():
users = User.query.all()
return jsonify(users)
if __name__ == "__main__":
users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
db.create_all()
db.session.add_all(users)
db.session.commit()
app.run()
Run Code Online (Sandbox Code Playgroud)
/users/现在,该路线将返回用户列表。
[
{"email": "user1@gmail.com", "id": 1},
{"email": "user2@gmail.com", "id": 2}
]
Run Code Online (Sandbox Code Playgroud)
@dataclass
class Account(db.Model):
id: int
users: User
id = db.Column(db.Integer)
users = db.relationship(User) # User model would need a db.ForeignKey field
Run Code Online (Sandbox Code Playgroud)
来自的回应jsonify(account)就是这样。
{
"id":1,
"users":[
{
"email":"user1@gmail.com",
"id":1
},
{
"email":"user2@gmail.com",
"id":2
}
]
}
Run Code Online (Sandbox Code Playgroud)
from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
"Add support for serializing timedeltas"
def default(o):
if type(o) == datetime.timedelta:
return str(o)
else:
return super().default(o)
app.json_encoder = CustomJSONEncoder
Run Code Online (Sandbox Code Playgroud)
更详细的解释。在您的模型中,添加:
def as_dict(self):
return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}
Run Code Online (Sandbox Code Playgroud)
适用str()于 python 3,因此如果使用 python 2,请使用unicode(). 它应该有助于反序列化日期。如果不处理这些,您可以将其删除。
您现在可以像这样查询数据库
some_result = User.query.filter_by(id=current_user.id).first().as_dict()
Run Code Online (Sandbox Code Playgroud)
First()需要避免奇怪的错误。as_dict()现在将反序列化结果。反序列化完成后,就可以转成json了
jsonify(some_result)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
138738 次 |
| 最近记录: |