Python clickhouse-driver:ValueError:参数应为字典形式

go2*_*ana 4 python clickhouse

我有一些 ETL,可以使用 clickhouse-driver 将数据保存到 clickhouse。

保存函数看起来就像这样:

def insert_data(data: Iterable[Dict], table: str, client: Client = None):
    columns = get_table_cols(table)
    client = client or get_ch_client(0)
    query = f"insert into {table} ({', '.join(columns)}) values"
    data = map(lambda row: {key: row[key] for key in columns}, data)
    client.execute(query, data)
Run Code Online (Sandbox Code Playgroud)

调用的函数insert_data如下所示:

def save_data(data: DataFrame, client: Client):

    mapper = get_mapper(my_table_map)
    data = map(lambda x: {col_new: getattr(x, col_old)
                          for col_old, col_new in map_dataframe_to_ch.items()},
               data.collect())
    data = map(mapper, data)
    insert_data(data, 'my_table_name', client)
Run Code Online (Sandbox Code Playgroud)

get_mapper返回一个如下所示的地图函数:

def map_row(row: Dict[str, Any]) -> Dict[str, Any]:
    nonlocal map_
    return {key: map_[key](val) for key, val in row.items()}
Run Code Online (Sandbox Code Playgroud)

所以基本上最后我有一些生成字典的嵌套生成器。为了确认这一点,如果我把它放在我得到的正是我所期望的字典print(next(data))之前。client.execute这是隐藏敏感信息的示例:

{'account_currency': '***', 
 'instrument': '***',
 'operation': 'open',
 'event_time': datetime.datetime(2020, 7, 7, 19, 11, 49),
 'country': 'CN',
 'region': 'Asia and Pacific',
 'registration_source': '***',
 'account_type': '***',
 'platform': '***',
 'server_key': '***'}
Run Code Online (Sandbox Code Playgroud)

表架构如下:

"account_currency": "String",
"instrument": "String",
"operation": "String",
"event_time": "DateTime",
"country": "String",
"region": "String",
"registration_source": "String",
"account_type": "String",
"platform": "String",
"server_key": "String"
Run Code Online (Sandbox Code Playgroud)

但无论出于何种原因,我都会收到此错误:

  File "src/etl/usd_volume/prepare_users.py", line 356, in <module>
    main()
  File "src/etl/usd_volume/prepare_users.py", line 348, in main
    save_data(data, client)
  File "src/etl/usd_volume/prepare_users.py", line 302, in save_data
    insert_data(data, 'report_traded_volume_usd', client)
  File "/root/data/src/common/clickhouse_helper.py", line 14, in insert_data
    client.execute(query, data)
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 224, in execute
    columnar=columnar
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 341, in process_ordinary_query
    query = self.substitute_params(query, params)
  File "/usr/local/lib/python3.6/dist-packages/clickhouse_driver/client.py", line 422, in substitute_params
    raise ValueError('Parameters are expected in dict form')
Run Code Online (Sandbox Code Playgroud)

根据文档:

:param params:SELECT 查询的替换参数和 INSERT 查询的数据。INSERT 的数据可以是list,tuple 或 :data: ~types.GeneratorType。默认为None(无参数或数据)。

显然我的数据符合这些要求。

但是在源代码中只有这个检查:

def substitute_params(self, query, params):
    if not isinstance(params, dict):
        raise ValueError('Parameters are expected in dict form')

    escaped = escape_params(params)
    return query % escaped
Run Code Online (Sandbox Code Playgroud)

我真的没有找到他们在哪里检查它是否是发电机。Clickhouse-driver版本是0.1.4

非常感谢对此问题的任何帮助。

go2*_*ana 6

好吧,对源代码的进一步研究揭示了根本原因。

抛出错误的函数substitute_params是在类process_ordinary_query的方法中调用的Client。基本上,除 INSERT 之外的任何查询都会调用此方法。

查询的符号是 INSERT 或任何其他都由这部分方法检查execute

is_insert = isinstance(params, (list, tuple, types.GeneratorType))

if is_insert:
    rv = self.process_insert_query(
        query, params, external_tables=external_tables,
        query_id=query_id, types_check=types_check,
        columnar=columnar
    )
else:
    rv = self.process_ordinary_query(
        query, params=params, with_column_types=with_column_types,
        external_tables=external_tables,
        query_id=query_id, types_check=types_check,
        columnar=columnar
    )
Run Code Online (Sandbox Code Playgroud)

症结在于isinstance(params, (list, tuple, types.GeneratorType))

types.GeneratorType定义如下:

def _g():
    yield 1
GeneratorType = type(_g())
Run Code Online (Sandbox Code Playgroud)

这导致了这一点:

>>>GeneratorType
<class 'generator'>
Run Code Online (Sandbox Code Playgroud)

显然,对于我的数据来说map

>>>type(map(...))
<class 'map'>
>>>isinstance(map(...), GeneratorType)
False
Run Code Online (Sandbox Code Playgroud)

因此,避免这个问题的最简单的解决方案就是简单地data使用生成器理解转换为生成器。这完全解决了问题。

>>>data = (i for i in data)
>>>isinstance(data, GeneratorType)
True
Run Code Online (Sandbox Code Playgroud)

或者,如果您打算专门执行 INSERT 查询,则可以直接调用process_insert_query,这将消除将数据转换为生成器的需要。

我认为这是由 clickhouse-driver 进行的有点不明确的类型检查,但这就是我们所拥有的。