如何在sqlalchemy中编写自己的方言来适应HTTP API?

She*_*Lei 6 python sqlalchemy dialect apache-superset

我正在尝试向 Superset(数据探索平台)添加一个特殊的数据源。该数据库仅支持HTTP API,返回json格式的数据;例如:

> http://localhost/api/sql/query?q="select * from table"
< [{"id": 1, "value":10}, {"id": 2, "value": 30} ...]
Run Code Online (Sandbox Code Playgroud)

因此,我必须在 python SQLAlchemy 中为 Superset 编写自己的适配器。我已经阅读了文档和部分源代码,但仍然需要好的示例来遵循。

sna*_*erb 5

(OP将解决方案编辑到问题中)

我已经解决了这个问题。这就是我所做的。

  1. ./site-packages/sqlalchemy/dialects

  2. 将任何具体方言复制到新方言(例如:名为 zeta)作为起点。更好的方法是使用

    from sqlalchemy.engine.default import DefaultDialect
    class ZetaDialect(DefaultDialect):
        ...
    
    Run Code Online (Sandbox Code Playgroud)
  3. 将 zeta 添加到__all__以下部分./site-packages/sqlalchemy/dialects/__init__.py

  4. 创建一个测试程序:

    from sqlalchemy import create_engine
    engine = create_engine('zeta://XXX')
    result = engine.execute("select * from table_name")
    for row in result:
        print(row)
Run Code Online (Sandbox Code Playgroud)
  1. 运行它并得到错误。使用pdb查找原因。大多数情况下,原因是未实现某些接口。一一解决。

  2. 当测试程序给出正确答案时,几乎已经完成了 90%。为了完整起见,我们还应该实现检查器使用的几个接口:

    class ZetaDialect(DefaultDialect):
        # default_paramstyle = 'qmark'
        name = 'zeta'

        def __init__(self, **kwargs):
            DefaultDialect.__init__(self, **kwargs)

        @classmethod
        def dbapi(cls):
            return zeta_dbapi

        @reflection.cache
        def get_table_names(self, connection, schema=None, **kw):
            return [u'table_1', u'table_2', ...]

        @reflection.cache
        def get_pk_constraint(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_foreign_keys(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_unique_constraints(self, connection, table_name,
                                   schema=None, **kw):
            return []

        @reflection.cache
        def get_indexes(self, connection, table_name, schema=None, **kw):
            return []

        @reflection.cache
        def get_schema_names(self, connection, **kw):
            return []

        @reflection.cache
        def get_columns(self, connection, table_name, schema=None, **kw):
            # just an example of the column structure
            result = connection.execute('select * from %s limit 1' % table_name)
            return [{'default': None, 'autoincrement': False, 'type': TEXT, 'name': colname, 'nullable': False} for colname, coltype in result.cursor.description]
Run Code Online (Sandbox Code Playgroud)

  • 而不是 3. 而是使用 ```from sqlalchemy.dialects importregistryregistry.register("zeta", "myapp.dialect", "ZetaDialect")``` (2认同)