根据数据框中的内容从 SQL 服务器中删除行

Mor*_*_DK 8 python sql sql-server sqlalchemy pandas

我有一个名为SQL Server的库存表中dbo.inventory包含YearMonthMaterialStock_quantity。我每天都会收到一个新的库存计数作为 csv 文件,需要将其加载到dbo.inventory表格中。然而,我需要删除的记录在数据库中,如果YearMonth从csv文件已经在数据库中,以便退出,以避免加载多个盘点同月。

在 SQL 中,我会这样做:

Delete t1 
FROM dbo.inventory t1
JOIN csv t2 ON t1.Year = t2.Year and t1.Month = t2.Month
Run Code Online (Sandbox Code Playgroud)

我不知道如何做到这一点在Python脚本,所以我不会调入我的CSV文件作为一个临时表到数据仓库,只是删除现有的行匹配YearMonth再加载它们。

我在另一个设置中使用了以下内容:

delete_date = sales.Date.max()
connection = engine.connect()
connection.execute(f"""delete from sales where Date = '{delete_date}'""")
connection.close()
Run Code Online (Sandbox Code Playgroud)

但这在这里不起作用,因为应该删除的输入是一个数据框,理论上它可以包含多个年份和月份,如果它是对早期加载的数字的更正。

Cod*_*ent 6

Pandas 不支持根据特定条件删除 SQL 行。您必须告诉 SQL Server 您要删除哪些行:

import sqlalchemy as sa

engine = sa.create_engine('mssql+pyodbc://...')
meta = sa.MetaData()

# Map the Inventory table in your database to a SQLAlchemy object
inventory = sa.Table('Inventory', meta, autoload=True, autoload_with=engine)

# Build the WHERE clause of your DELETE statement from rows in the dataframe.
# Equivalence in T-SQL
#      WHERE (Year = ... AND Month = ...) OR (Year = ... AND Month = ...) OR (Year = ... AND Month = ...)
cond = df.apply(lambda row: sa.and_(inventory.c['Year'] == row['Year'], inventory.c['Month'] == row['Month']), axis=1)
cond = sa.or_(*cond)

# Define and execute the DELETE
delete = inventory.delete().where(cond)
with engine.connect() as conn:
    conn.execute(delete)

# Now you can insert the new data
df.to_sql('Inventory', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)


Per*_*ruz 0

我认为你有两个不错的选择。

1)在熊猫中工作。使用 Pandas.read_sql_table() 查询现有表,将 csv 文件作为第二个 DataFrame 导入,然后将旧表与新表合并更新。然后插入更新的 DataFrame,例如使用 df.to_sql(..., if contains='update')。

2)使用 sqlalchemy 并在数据库中工作,特别是如果您想保留架构或其他条件。

下面是基于这两种解决方案的简短通用示例。其他更具体的解决方案可能是可能的,但这是两个起点。

import sqlalchemy as sa
import sqlalchemy.ext.declarative as sa_dec
import sqlalchemy.orm as sa_orm
import pandas as pd
from sqlalchemy import update
from sqlalchemy import and_

#con = sqlite3.connect('hyp.db')
#cur = con.cursor()

# general pandas solution
t1 = pd.DataFrame({'year': [1, 2, 3], 'month': [4, 5, 6], 'value': [2, 2, 2]})
t2 = pd.DataFrame({'year': [1, 5, 3], 'month': [4, 9, 9], 'value': [1, 5, 10]})
c = pd.merge(t1, t2, how='outer', on=['year', 'month'], suffixes=['', '_t2'])
c.loc[c['value_t2'].notnull(), 'value'] = c.loc[c['value_t2'].notnull(), 'value_t2']
c = c.drop('value_t2', axis=1)
print(c)

# pandas using update
t1 = pd.DataFrame({'year': [1, 2, 3], 'month': [4, 5, 6], 'value': [2, 2, 2]})
t2 = pd.DataFrame({'year': [1, 5, 3], 'month': [4, 9, 9], 'value': [1, 5, 10]})
c = pd.merge(t1, t2, how='outer', on=['year', 'month'], suffixes=['', '_t2'])
c['value'].update(c['value_t2'])
c = c.drop('value_t2', axis=1)
print(c)

# the c.to_sql(...)

##### sqlalchemy

Name = 'try.db'
Type = 'sqlite'
Url = sa.engine.url.URL(Type, database=Name)
Engine = sa.engine.create_engine(Url)
Base = sa_dec.declarative_base()
Session = sa_orm.sessionmaker(bind=Engine)

class Info(Base):
    __tablename__ = 'Inventory'
    id = sa.Column(sa.Integer, primary_key=True)
    __table_args__ = (sa.UniqueConstraint('Year', 'Month'),)
    Year = sa.Column(sa.String(250))
    Month = sa.Column(sa.String(250))
    Value = sa.Column(sa.Float)

Base.metadata.create_all(Engine)

# change values of year and month to test
t = pd.DataFrame({'Year': [1, 2, 5], 'Month': ['Jun', 'July', 'Dec'], 'Value': [3, 3, 3]})


# this isn't very efficient but it is here to give you a comprehensive example
# where you have good control on what is happening
for i, r in t.iterrows():
    newdata = Info()
    for col, val in r.items():
        setattr(newdata, col, val)
    con = Engine.connect()
    session = Session()  # open sqlalchemy-sqlite session
    session.add(newdata)  # add Info instance to session to insert
    try:
        session.flush()  # test insert, to see if there is any error
    except sa.exc.IntegrityError:  # here catch unique constraint error if already in db
        print('already in')
        session.rollback()  # rollback to remove the blocked instance
        stmt = update(Info).where(and_(Info.Year == r['Year'], Info.Year == r['Month'])).values(Value=r['Value'])
        con.execute(stmt)
    else:
        session.commit()  # commit changes to db
    finally:
        session.close()  # close session to keep clean, it will be open in case of new data
        con.close()
Run Code Online (Sandbox Code Playgroud)

我测试了这两种解决方案,它们似乎有效,但需要进一步测试。