从Pandas向SQLite表添加新列的工作流程

cd9*_*d98 6 python sql database sqlite pandas

建立

两张桌子:schoolsstudents.SQLite中的索引(或多个密钥)将是idtime用于students表和schooltimeschools表.我的数据集是关于不同的东西,但我认为学生的例子更容易理解.

import pandas as pd
import numpy as np
import sqlite3

df_students = pd.DataFrame(
{'id': list(range(0,4)) + list(range(0,4)),
'time': [0]*4 + [1]*4, 'school': ['A']*2 + ['B']*2 + ['A']*2 + ['B']*2,
'satisfaction': np.random.rand(8)} )
df_students.set_index(['id', 'time'], inplace=True)

        satisfaction    school
id  time        
0   0   0.863023    A
1   0   0.929337    A
2   0   0.705265    B
3   0   0.160457    B
0   1   0.208302    A
1   1   0.029397    A
2   1   0.266651    B
3   1   0.646079    B

df_schools = pd.DataFrame({'school': ['A']*2 + ['B']*2, 'time': [0]*2 + [1]*2, 'mean_scores': np.random.rand(4)})
df_schools.set_index(['school', 'time'], inplace=True)
df_schools


               mean_scores
school  time    
A       0     0.358154
A       0     0.142589
B       1     0.260951
B       1     0.683727

## Send to SQLite3

conn = sqlite3.connect('schools_students.sqlite')

df_students.to_sql('students', conn)
df_schools.to_sql('schools', conn)
Run Code Online (Sandbox Code Playgroud)

我需要做什么?

我有一堆函数在pandas数据帧上运行并创建新列,然后应该插入schoolsstudents表中(取决于我正在构建的内容).典型的功能按顺序执行:

  1. 查询两个SQL表中的列
  2. 使用pandas功能,例如groupby,apply自定义功能,rolling_mean等等(其中许多是不可用的SQL,或难写)构造一个新列.返回类型是pd.Seriesnp.array
  3. 将新列添加到适当的数据框(schoolsstudents)

这些函数是在我有一个适合内存的小型数据库时编写的,因此它们是纯粹的pandas.

这是伪代码中的一个例子:

def example_f(satisfaction, mean_scores)
    """Silly function that divides mean satisfaction per school by mean score"""
    #here goes the pandas functions I already wrote
    mean_satisfaction = mean(satisfaction) 
    return mean_satisfaction/mean_scores

satisf_div_score = example_f(satisfaction, mean_scores)
# Here push satisf_div_score to `schools` table
Run Code Online (Sandbox Code Playgroud)

因为我的数据集非常大,所以我无法在内存中调用这些函数.想象一下,学校位于不同的地区.最初我只有一个区,所以我知道这些功能可以分别处理来自每个区的数据.

我认为可行的工作流程是:

  • 查询区的相关数据 i
  • 将函数应用于区域的数据i并生成新列,如np.array或pd.Series
  • 将此列插入相应的表(将填充i该列的区域的数据
  • 对于从i1到1的区域重复K

虽然我的数据集在SQLite中(我更喜欢它保持这种状态!)如果好处很大,我愿意将它迁移到其他东西.


我意识到有不同的合理答案,但听到一些事实证明对你有用和简单的事情会很棒.谢谢!

Eug*_*sky 1

有多种方法,您可以选择更适合您的特定任务的方法:

  1. 将所有数据移至“更大”的数据库。就我个人而言,我更喜欢 PostgreSQL - 它可以很好地处理大数据集。幸运的是 pandas 支持 SQLAlchemy - 跨数据库 ORM,因此您可以对不同的数据库使用相同的查询。

  2. 将数据分割成块并单独计算任何块。我将使用 PostgreSQL 进行演示,但您可以使用任何数据库。

    from sqlalchemy import create_engine
    import psycopg2
    mydb = create_engine('postgresql://user@host.domain:5432/database')
    # lets select some groups of data into first dataframe, 
    # you may use school ids instead of my sections
    df=pd.read_sql_query('''SELECT sections, count(id) FROM table WHERE created_at <'2016-01-01' GROUP BY sections ORDER BY 2 DESC LIMIT 10''', con=mydb)
    print(df)  # don't worry about strange output - sections have type int[] and it's supported well!
    
       sections     count
    0  [121, 227]  104583
    1  [296, 227]   48905
    2  [121]        43599
    3  [302, 227]   29684 
    4  [298, 227]   26814
    5  [294, 227]   24071
    6  [297, 227]   23038
    7  [292, 227]   22019
    8  [282, 227]   20369
    9  [283, 227]   19908
    
    # Now we have some sections and we can select only data related to them
    for section in df['sections']:
       df2 = pd.read_sql_query('''SELECT sections, name, created_at, updated_at, status 
                                  FROM table 
                                  WHERE created_at <'2016-01-01'   
                                      AND sections=%(section)s 
                                  ORDER BY created_at''', 
                               con=mydb, params=dict(section=section))
        print(section, df2.std())
    
    [121, 227] status    0.478194
    dtype: float64
    [296, 227] status    0.544706
    dtype: float64
    [121] status    0.499901
    dtype: float64
    [302, 227] status    0.504573
    dtype: float64
    [298, 227] status    0.518472
    dtype: float64
    [294, 227] status    0.46254
    dtype: float64
    [297, 227] status    0.525619
    dtype: float64
    [292, 227] status    0.627244
    dtype: float64
    [282, 227] status    0.362891
    dtype: float64
    [283, 227] status    0.406112
    dtype: float64
    
    Run Code Online (Sandbox Code Playgroud)

    当然,这个例子是综合的 - 计算文章的平均状态是相当荒谬的:)但它演示了如何分割大量数据并按部分处理它。

  3. 使用特定的 PostgreSQL(或 Oracle 或 MS 或任何您喜欢的)进行统计。这里有关于PostgreSQL 中的窗口函数的优秀文档。幸运的是,您可以在数据库中执行一些计算并将预制数据移动到 DataFrame,如上所述。

更新:如何将信息加载回数据库。

幸运的是,DataFrame 支持to_sql方法使这个过程变得简单:

from sqlalchemy import create_engine
mydb = create_engine('postgresql://user@host.domain:5432/database')
df2.to_sql('tablename', mydb, if_exists='append', chunksize=100)
Run Code Online (Sandbox Code Playgroud)

您可以指定所需的操作:if_exists='append'将行添加到表中,如果您有很多行,您可以将它们拆分为块,以便数据库可以插入它们。