使用 py2neo 上传数据的最佳方式

Iva*_*nko 7 neo4j py2neo

我尝试过使用 py2neo 上传中等大小数据集的方法。就我而言,每天需要加载大约 80 K 个节点和 400 K 个边。我想分享我的经验,并向社区询问是否还有我没有遇到的更好的方法。

A. py2neo 的“本机”命令。

使用 创建节点graph.merge_one()并使用 设置属性push()。我很快就忽略了这一点,因为它非常慢,甚至在几分钟内都不会超过 10 K 记录。毫不奇怪,py2neo 的文档和这里的一些帖子推荐使用 Cypher。

B. 不分区的密码

py2neo.cypher.CypherTransaction append()在循环中和commit()最后使用。

# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query) 
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
# begin new Cypher transaction
tx = neoGraph.cypher.begin()
for row in result:
    tx.append(statement, {"ID": row.id_field})
tx.commit()
Run Code Online (Sandbox Code Playgroud)

这会超时并使 Neo4j 服务器崩溃。我知道问题是所有 80 K Cypher 语句都试图一次性执行。

C. 具有分区和一次提交的 Cypher

我使用计数器和process()命令一次运行 1000 个语句。

# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query) 
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
    counter += 1
    tx.append(statement, {"ID": row.id_field})
    if (counter == 1000):
        tx.process()    # process 1000 statements
        counter = 0
tx.commit()
Run Code Online (Sandbox Code Playgroud)

一开始运行速度很快,但在处理 1000 个事务时速度会变慢。最终,它因堆栈溢出而超时。这很令人惊讶,因为我预计process()每次都会重置堆栈。

D. 带有分区的 Cypher 以及每个分区的提交

这是唯一运行良好的版本。对每个包含 1000 个事务的分区进行操作commit(),然后使用 重新启动一个新事务begin()

# query sent to MSSQL. Returns ~ 80K records
result = engine.execute(query) 
statement = "MERGE (e:Entity {myid: {ID}}) SET e.p = 1"
counter = 0
tx = neoGraph.cypher.begin()
for row in result:
    counter += 1
    tx.append(statement, {"ID": row.id_field})
    if (counter == 1000):
        tx.commit()                   # commit 1000 statements
        tx = neoGraph.cypher.begin()  # reopen transaction
        counter = 0
tx.commit()
Run Code Online (Sandbox Code Playgroud)

这运行得又快又好。

任何意见?

Wil*_*yon 2

正如您通过反复试验发现的,单个事务在操作次数不超过 10K-50K 时性能最佳。您在D中描述的方法效果最好,因为您每 1000 个语句就提交一次事务。您也许可以安全地增加批量大小。

您可能想要尝试的另一种方法是将值数组作为参数传递,并使用 Cypher 的UNWIND命令来迭代它们。例如:

WITH {id_array} AS ids // something like [1,2,3,4,5,6]
UNWIND ids AS ident
MERGE (e:Entity {myid: ident})
SET e.p = 1
Run Code Online (Sandbox Code Playgroud)