Mor*_*_DK 8 python sql sql-server sqlalchemy pandas
我有一个名为SQL Server的库存表中dbo.inventory
包含Year
,Month
,Material
和Stock_quantity
。我每天都会收到一个新的库存计数作为 csv 文件,需要将其加载到dbo.inventory
表格中。然而,我需要删除的记录在数据库中,如果Year
与Month
从csv文件已经在数据库中,以便退出,以避免加载多个盘点同月。
在 SQL 中,我会这样做:
Delete t1
FROM dbo.inventory t1
JOIN csv t2 ON t1.Year = t2.Year and t1.Month = t2.Month
Run Code Online (Sandbox Code Playgroud)
我不知道如何做到这一点在Python脚本,所以我不会调入我的CSV文件作为一个临时表到数据仓库,只是删除现有的行匹配Year
和Month
再加载它们。
我在另一个设置中使用了以下内容:
delete_date = sales.Date.max()
connection = engine.connect()
connection.execute(f"""delete from sales where Date = '{delete_date}'""")
connection.close()
Run Code Online (Sandbox Code Playgroud)
但这在这里不起作用,因为应该删除的输入是一个数据框,理论上它可以包含多个年份和月份,如果它是对早期加载的数字的更正。
Pandas 不支持根据特定条件删除 SQL 行。您必须告诉 SQL Server 您要删除哪些行:
import sqlalchemy as sa
engine = sa.create_engine('mssql+pyodbc://...')
meta = sa.MetaData()
# Map the Inventory table in your database to a SQLAlchemy object
inventory = sa.Table('Inventory', meta, autoload=True, autoload_with=engine)
# Build the WHERE clause of your DELETE statement from rows in the dataframe.
# Equivalence in T-SQL
# WHERE (Year = ... AND Month = ...) OR (Year = ... AND Month = ...) OR (Year = ... AND Month = ...)
cond = df.apply(lambda row: sa.and_(inventory.c['Year'] == row['Year'], inventory.c['Month'] == row['Month']), axis=1)
cond = sa.or_(*cond)
# Define and execute the DELETE
delete = inventory.delete().where(cond)
with engine.connect() as conn:
conn.execute(delete)
# Now you can insert the new data
df.to_sql('Inventory', engine, if_exists='append', index=False)
Run Code Online (Sandbox Code Playgroud)
我认为你有两个不错的选择。
1)在熊猫中工作。使用 Pandas.read_sql_table() 查询现有表,将 csv 文件作为第二个 DataFrame 导入,然后将旧表与新表合并更新。然后插入更新的 DataFrame,例如使用 df.to_sql(..., if contains='update')。
2)使用 sqlalchemy 并在数据库中工作,特别是如果您想保留架构或其他条件。
下面是基于这两种解决方案的简短通用示例。其他更具体的解决方案可能是可能的,但这是两个起点。
import sqlalchemy as sa
import sqlalchemy.ext.declarative as sa_dec
import sqlalchemy.orm as sa_orm
import pandas as pd
from sqlalchemy import update
from sqlalchemy import and_
#con = sqlite3.connect('hyp.db')
#cur = con.cursor()
# general pandas solution
t1 = pd.DataFrame({'year': [1, 2, 3], 'month': [4, 5, 6], 'value': [2, 2, 2]})
t2 = pd.DataFrame({'year': [1, 5, 3], 'month': [4, 9, 9], 'value': [1, 5, 10]})
c = pd.merge(t1, t2, how='outer', on=['year', 'month'], suffixes=['', '_t2'])
c.loc[c['value_t2'].notnull(), 'value'] = c.loc[c['value_t2'].notnull(), 'value_t2']
c = c.drop('value_t2', axis=1)
print(c)
# pandas using update
t1 = pd.DataFrame({'year': [1, 2, 3], 'month': [4, 5, 6], 'value': [2, 2, 2]})
t2 = pd.DataFrame({'year': [1, 5, 3], 'month': [4, 9, 9], 'value': [1, 5, 10]})
c = pd.merge(t1, t2, how='outer', on=['year', 'month'], suffixes=['', '_t2'])
c['value'].update(c['value_t2'])
c = c.drop('value_t2', axis=1)
print(c)
# the c.to_sql(...)
##### sqlalchemy
Name = 'try.db'
Type = 'sqlite'
Url = sa.engine.url.URL(Type, database=Name)
Engine = sa.engine.create_engine(Url)
Base = sa_dec.declarative_base()
Session = sa_orm.sessionmaker(bind=Engine)
class Info(Base):
__tablename__ = 'Inventory'
id = sa.Column(sa.Integer, primary_key=True)
__table_args__ = (sa.UniqueConstraint('Year', 'Month'),)
Year = sa.Column(sa.String(250))
Month = sa.Column(sa.String(250))
Value = sa.Column(sa.Float)
Base.metadata.create_all(Engine)
# change values of year and month to test
t = pd.DataFrame({'Year': [1, 2, 5], 'Month': ['Jun', 'July', 'Dec'], 'Value': [3, 3, 3]})
# this isn't very efficient but it is here to give you a comprehensive example
# where you have good control on what is happening
for i, r in t.iterrows():
newdata = Info()
for col, val in r.items():
setattr(newdata, col, val)
con = Engine.connect()
session = Session() # open sqlalchemy-sqlite session
session.add(newdata) # add Info instance to session to insert
try:
session.flush() # test insert, to see if there is any error
except sa.exc.IntegrityError: # here catch unique constraint error if already in db
print('already in')
session.rollback() # rollback to remove the blocked instance
stmt = update(Info).where(and_(Info.Year == r['Year'], Info.Year == r['Month'])).values(Value=r['Value'])
con.execute(stmt)
else:
session.commit() # commit changes to db
finally:
session.close() # close session to keep clean, it will be open in case of new data
con.close()
Run Code Online (Sandbox Code Playgroud)
我测试了这两种解决方案,它们似乎有效,但需要进一步测试。
归档时间: |
|
查看次数: |
5559 次 |
最近记录: |