使用HappyBase连接池的PySpark dataframe.foreach()返回'TypeError:无法pickle thread.lock对象'

Ale*_*ord 10 python happybase apache-spark pyspark

我有一个PySpark作业,用于更新HBase中的一些对象(Spark v1.6.0; happybase v0.9).

如果我打开/关闭每行的HBase连接,它会有效:

def process_row(row):
    conn = happybase.Connection(host=[hbase_master])
    # update HBase record with data from row
    conn.close()

my_dataframe.foreach(process_row)
Run Code Online (Sandbox Code Playgroud)

几千次upserts后,我们开始看到这样的错误:

TTransportException: Could not connect to [hbase_master]:9090
Run Code Online (Sandbox Code Playgroud)

显然,为每个upsert打开/关闭连接效率很低.这个函数实际上只是一个适当解决方案的占位符.

然后我尝试创建一个process_row使用连接池的函数版本:

pool = happybase.ConnectionPool(size=20, host=[hbase_master])

def process_row(row):
    with pool.connection() as conn:
        # update HBase record with data from row
Run Code Online (Sandbox Code Playgroud)

由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):

TypeError: can't pickle thread.lock objects
Run Code Online (Sandbox Code Playgroud)

你能看出我做错了什么吗?

更新

我看到这篇文章并怀疑我遇到了同样的问题:Spark尝试序列化pool对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享.

听起来我需要将数据集拆分为分区,并且每个分区使用一个连接(请参阅使用foreachrdd的设计模式).我根据文档中的示例尝试了这个:

def persist_to_hbase(dataframe_partition):
    hbase_connection = happybase.Connection(host=[hbase_master])
    for row in dataframe_partition:
        # persist data
    hbase_connection.close()

my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
Run Code Online (Sandbox Code Playgroud)

不幸的是,它仍然返回"无法解决thread.lock对象"错误.

wou*_*lee 1

从根本上来说,happybase 连接只是 tcp 连接,因此它们不能在进程之间共享。连接池主要对多线程应用程序有用,而且对单线程应用程序也很有用,可以将池用作具有连接重用的全局“连接工厂”,这可以简化代码,因为不需要传递“连接”对象大约。它还使错误恢复变得更容易一些。

在任何情况下,池(只是一组连接)都不能在进程之间共享。因此尝试将其序列化是没有意义的。(池使用锁会导致序列化失败,但这只是一个症状。)

也许您可以使用一个助手来有条件地创建一个池(或连接)并将其存储为模块局部变量,而不是在导入时实例化它,例如

_pool = None

def get_pool():
    global _pool
    if _pool is None:
        _pool = happybase.ConnectionPool(size=1, host=[hbase_master])
    return pool

def process(...)
    with get_pool().connection() as connection:
        connection.table(...).put(...)
Run Code Online (Sandbox Code Playgroud)

这会在第一次使用时实例化池/连接,而不是在导入时实例化。