Jas*_*ant 9 python sql postgresql join sqlalchemy
楷模:
class Team(Base):
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
players = relationship("Player", backref="team")
class Player(Base):
id = Column(Integer, primary_key=True)
name = Column(String(255), unique=True)
team_id = Column(Integer, ForeignKey("team.id"))
positions = relationship("Position", backref="player")
class Position(Base):
id = Column(Integer(), primary_key=True)
name = Column(String(255), unique=True)
player_id = Column(Integer, ForeignKey("player.id"))
goals = relationship("Goal", backref="position")
class Goal(Base):
id = Column(Integer(), primary_key=True)
distance = Column(Integer)
position_id = Column(Integer, ForeignKey("position.id"))
Run Code Online (Sandbox Code Playgroud)
# Query to get all goals of all players of a team
query = (
select(Team)
.select_from(Player, Position, Goal)
.options(joinedload(Team.players))
.options(
joinedload(
Team.players,
Player.positions,
)
)
.options(
joinedload(
Team.players,
Player.positions,
Position.goals,
)
)
result = await db.execute(query)
response = result.scalar()
Run Code Online (Sandbox Code Playgroud)
上述查询的 json 输出示例,
{
"id": 3,
"players": [
{
"id": 3,
"positions": []
},
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 13,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
},
{
"id": 15,
},
{
"id": 14,
}
]
},
{
"id": 2,
"goals": [
{
"id": 4,
}
]
}
]
}
]
}
Run Code Online (Sandbox Code Playgroud)
从示例 json 中,我们可以清楚地看到,对于 id=1 的玩家,返回了多个目标。
现在,我需要将查询限制为每个球员的最后一个进球,而不是该球员的所有进球。
所以我尝试了,
subquery = (
select(Goal)
.order_by(Goal.id.desc())
.limit(1)
.subquery()
.lateral()
)
query = (
select(Team)
.select_from(Player, Position, Goal)
.options(joinedload(Team.players))
.options(
joinedload(
Team.players,
Player.positions,
)
)
.outerjoin(subquery)
.options(
contains_eager(
Team.players,
Player.positions,
Position.goals,
alias=subquery,
)
)
result = await db.execute(query)
response = result.scalar()
Run Code Online (Sandbox Code Playgroud)
上述查询的 json 输出示例
{
"id": 3,
"players": [
{
"id": 3,
"positions": []
},
{
"id": 5,
"positions": [
{
"id": 7,
"goals": [
{
"id": 16,
}
]
}
]
},
{
"id": 1,
"positions": [
{
"id": 1,
"goals": [
{
"id": 16,
}
]
},
{
"id": 2,
"goals": [
{
"id": 16,
}
]
}
]
}
]
}
Run Code Online (Sandbox Code Playgroud)
这会获取任何玩家的最后一个进球,但不会获取各个玩家的最后一个进球。
像Goal.position_id == Position.idinouterjoin或subquery这样的过滤器不起作用或导致错误。
编辑:
看起来我需要populate_existing(),但它在新select方法中不可用。
编辑2:
为了简化这些查询,我还考虑last_goal_id在表中创建列position并更新position表以存储最后插入的 id goal。2个表中的外键互为正常吗?goal将会有position_id并且position将会有last_goal_id。
前言
首先,我认为下面的行不应成为查询的一部分,因为它将创建笛卡尔积。执行查询时查找 sqlalchemy 警告:
.select_from(Player, Position, Goal) # DELETE this as it creates cartesian product
Run Code Online (Sandbox Code Playgroud)
其次,您可以稍微简化原始查询。下面生成一个与您问题中的查询等效的查询:
# Query to get all goals of all players of a team
query1 = (
select(Team)
# .select_from(Player, Position, Goal) # DELETE this as it creates cartesian product
.options(
joinedload(Team.players)
.joinedload(Player.positions)
.joinedload(Position.goals)
)
)
Run Code Online (Sandbox Code Playgroud)
contains_eager作为替代joinedload
上面的查询也可以通过 a) 显式连接相关表,b) 向 sqlalchemy 暗示查询已经包含所需的关系来以不同的方式实现:
query2 = (
select(Team)
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Position.goals)
.options(contains_eager(
Team.players,
Player.positions,
Position.goals,
))
)
Run Code Online (Sandbox Code Playgroud)
解决方案:
鉴于我们现在可以更明确地了解关系连接条件,实现查询的一种方法如下:
# subquery to use in the join for getting only the last 1 goal for each Position
subq = (
select(Goal.id.label("last_goal_id"))
.filter(Goal.position_id == Position.id)
.order_by(Goal.id.desc())
.limit(1)
.scalar_subquery()
.correlate(Position)
)
query3 = (
select(Team)
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Goal, Goal.id == subq) # use the JOIN which includes ONLY last Goal, but ...
.options(contains_eager(
Team.players,
Player.positions,
Position.goals, # ... tell sqlalchemy that we actually loaded ALL `.goals`
))
)
Run Code Online (Sandbox Code Playgroud)
生成以下内容SQL(sqlite):
SELECT goal.id,
goal.distance,
goal.position_id,
position.id AS id_1,
position.name,
position.player_id,
player.id AS id_2,
player.name AS name_1,
player.team_id,
team.id AS id_3,
team.name AS name_2
FROM team
LEFT OUTER JOIN player ON team.id = player.team_id
LEFT OUTER JOIN position ON player.id = position.player_id
LEFT OUTER JOIN goal ON goal.id =
(SELECT goal.id AS last_goal_id
FROM goal
WHERE goal.position_id = position.id
ORDER BY goal.id DESC
LIMIT 1)
Run Code Online (Sandbox Code Playgroud)
替代解决方案:
您还可以创建一个hybrid_property指向最后一个目标的计算列Goal.id,Position并使用它来定义仅包含列表中最后一个目标的关系:
class Position(Base):
__tablename__ = "position"
id = Column(Integer(), primary_key=True)
name = Column(String(255), unique=True)
player_id = Column(Integer, ForeignKey("player.id"))
goals = relationship("Goal", backref="position")
@hybrid_property
def last_goal_id(self):
...
@last_goal_id.expression
def last_goal_id(cls):
stmt = (
select(Goal.id.label("last_goal_id"))
# .filter(Goal.position_id == Position.id)
.filter(Goal.position_id == cls.id)
.order_by(Goal.id.desc())
.limit(1)
.scalar_subquery()
.correlate(cls)
# .correlate_except(Goal)
)
return stmt
last_goals = relationship(
lambda: Goal,
primaryjoin=lambda: and_(
Goal.position_id == Position.id,
Goal.id == Position.last_goal_id,
),
viewonly=True,
uselist=True,
)
Run Code Online (Sandbox Code Playgroud)
在这种情况下,您可以使用以下查询,但您不应该导航Position.goals关系,因为它将加载整个列表。而且 json 键的名称也会不同。
query1 = (
select(Team)
.options(
joinedload(Team.players)
.joinedload(Player.positions)
.joinedload(Position.last_goals) # use `.last_goals` instead of `.goals`
)
)
Run Code Online (Sandbox Code Playgroud)
注意:我个人最喜欢这个,因为它干净、明确。
您甚至可以混合使用这些技术来获得双方:使用.last_goals关系,但欺骗SA 认为它是满载的.goals:
query2 = (
select(Team)
.outerjoin(Team.players)
.outerjoin(Player.positions)
.outerjoin(Position.last_goals) # join via `.last_goals` relationship join, but ...
.options(contains_eager(
Team.players,
Player.positions,
Position.goals, # ... tell sqlalchemy that we actually loaded `.goals`
))
)
Run Code Online (Sandbox Code Playgroud)
看看 using RANK,它可能会满足您的需要,尽管它需要一些查询/子查询而不是一个大的joinedload.
我有一个子查询来对目标日期进行排名,并按位置或球员对它们进行分区,并将其过滤到排名等于 1 的位置。这将为您提供每个位置的最新目标,您可以为其创建一个字典。通过主查询,您可以使用位置 ID 来使用该字典查找最新目标。
像这样的东西:
# Rank goals by id and position
subquery = select(
Goal.id.label('goal_id'),
Goal.position_id,
func.rank().over(order_by=Goal.id.desc(), partition_by(Goal.position_id)).label('rank'),
).subquery()
# Create dict of {position_id: latest_goal_id} to use as a lookup
latest_goal_query = (
select(subquery.c.goal_id, subquery.c.position_id)
.where(subquery.c.rank == 1)
)
latest_goal_ids = {pos_id: goal_id for goal_id, pos_id in session.execute(latest_goals).fetchall()}
# Get goal objects from the IDs
goal_query = select(Goal).where(Goal.id.in_(latest_goals.values()))
goals = {goal.id: goal for goal in session.execute(goal_query).scalars()}
# Map position ID to the latest goal object
latest_goals = {pos_id: goals[goal_id] for pos_id, goal_id in latest_goal_ids.items()}
# Read the team and position, and you can use the position_id to get the latest goal
query = ...
Run Code Online (Sandbox Code Playgroud)
顺便说一句,我曾经尝试过joinedload一切,直到 SQLAlchemy 的作者告诉我应该selectinload尽可能使用它,因为它只获取您需要的数据,而联接可能有大量重复数据(例如,如果您的团队有 20 名球员,每个球员有 5 个位置,每个球员有 20 个进球,那么我认为全部加入将导致每个球队名称被发送 2000 次,每个球员名称被发送 100 次)。
编辑:column_property只是想到作为替代解决方案。不幸的是,我一直无法弄清楚如何映射实际的目标模型,因此这并不完美,但这里有一个示例,说明如何将最新目标的 ID 直接添加到玩家模型中。
class Player(Base):
...
latest_goal_id = column_property(
select(Goal.id)
.where(Goal.position.has(Position.player_id == id)),
.order_by(Goal.id.desc()).limit(1)
)
Run Code Online (Sandbox Code Playgroud)
从查询的角度来看,它只是被视为另一列,因此您可以通过它进行选择和过滤。