使用Pandas .to_sql将JSON列写入Postgres

per*_*lmq 10 postgresql json etl sqlalchemy pandas

ETL过程中,我需要从一个Postgres数据库中提取并加载JSON列到另一个.我们使用Pandas,因为它有很多方法可以从不同的源/目的地读取和写入数据,所有转换都可以使用Python和Pandas编写.我们对诚实的做法感到非常满意..但我们遇到了问题.

通常,读取和写入数据非常容易.您只需使用pandas.read_sql_table从源和pandas.to_sql读取数据以将其写入目标.但是,由于其中一个源表有一个JSON类型的列(来自Postgres),该to_sql函数崩溃时出现以下错误消息.

    df.to_sql(table_name, analytics_db)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/core/generic.py", line 1201, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 470, in to_sql
    chunksize=chunksize, dtype=dtype)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 1147, in to_sql
    table.insert(chunksize)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 663, in insert
    self._execute_insert(conn, keys, chunk_iter)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/pandas/io/sql.py", line 638, in _execute_insert
    conn.execute(self.insert_statement(), data)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 945, in execute
    return meth(self, multiparams, params)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 263, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1053, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1189, in _execute_context
    context)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1393, in _handle_dbapi_exception
    exc_info
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
    context)
  File "/home/ec2-user/python-virtual-environments/etl/local/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", line 459, in do_executemany
    cursor.executemany(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.ProgrammingError) can't adapt type 'dict'
Run Code Online (Sandbox Code Playgroud)

per*_*lmq 21

我一直在网上寻找解决方案,但找不到任何所以我们想出的是这里(可能有更好的方法,但至少这是一个开始,如果其他人遇到这个).

在中指定dtype参数to_sql.

我们从:df.to_sql(table_name, analytics_db)去了df.to_sql(table_name, analytics_db, dtype={'name_of_json_column_in_source_table': sqlalchemy.types.JSON}),它只是工作.


rum*_*bin 7

如果您使用(重新)创建 JSON 列json.dumps(),则一切就绪。通过这种方式,可以使用 pandas 的.to_sql()方法写入数据,但也可以使用COPYPostgreSQL更快的方法(通过copy_expert()psycopg2 或 sqlalchemy 的raw_connection())。

为了简单起见,让我们假设我们有一列字典应该写入 JSON(B) 列:

import json
import pandas as pd

df = pd.DataFrame([['row1',{'a':1, 'b':2}],
                   ['row2',{'a':3,'b':4,'c':'some text'}]],
                  columns=['r','kv'])

# conversion function:
def dict2json(dictionary):
    return json.dumps(dictionary, ensure_ascii=False)

# overwrite the dict column with json-strings
df['kv'] = df.kv.map(dict2json)
Run Code Online (Sandbox Code Playgroud)


小智 6

我无法评论peralmq 的答案,但如果是 postgresql JSONB,您可以使用

from sqlalchemy import dialects
dataframe.to_sql(..., dtype={"json_column":dialects.postgresql.JSONB})
Run Code Online (Sandbox Code Playgroud)