我有一些 ETL,可以使用 clickhouse-driver 将数据保存到 clickhouse。
保存函数看起来就像这样:
def insert_data(data: Iterable[Dict], table: str, client: Client = None):
columns = get_table_cols(table)
client = client or get_ch_client(0)
query = f"insert into {table} ({', '.join(columns)}) values"
data = map(lambda row: {key: row[key] for key in columns}, data)
client.execute(query, data)
Run Code Online (Sandbox Code Playgroud)
调用的函数insert_data
如下所示:
def save_data(data: DataFrame, client: Client):
mapper = get_mapper(my_table_map)
data = map(lambda x: {col_new: getattr(x, col_old)
for col_old, col_new in map_dataframe_to_ch.items()},
data.collect())
data = map(mapper, data)
insert_data(data, 'my_table_name', client)
Run Code Online (Sandbox Code Playgroud)
get_mapper
返回一个如下所示的地图函数:
def map_row(row: Dict[str, …
Run Code Online (Sandbox Code Playgroud)