Ale*_*ord 10 python happybase apache-spark pyspark
我有一个PySpark作业,用于更新HBase中的一些对象(Spark v1.6.0; happybase v0.9).
如果我打开/关闭每行的HBase连接,它会有效:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
Run Code Online (Sandbox Code Playgroud)
几千次upserts后,我们开始看到这样的错误:
Run Code Online (Sandbox Code Playgroud)TTransportException: Could not connect to [hbase_master]:9090
显然,为每个upsert打开/关闭连接效率很低.这个函数实际上只是一个适当解决方案的占位符.
然后我尝试创建一个process_row
使用连接池的函数版本:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
Run Code Online (Sandbox Code Playgroud)
由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):
Run Code Online (Sandbox Code Playgroud)TypeError: can't pickle thread.lock objects
你能看出我做错了什么吗?
我看到这篇文章并怀疑我遇到了同样的问题:Spark尝试序列化pool
对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享.
听起来我需要将数据集拆分为分区,并且每个分区使用一个连接(请参阅使用foreachrdd的设计模式).我根据文档中的示例尝试了这个:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close()
my_dataframe.foreachPartition(lambda dataframe_partition: persist_to_hbase(dataframe_partition))
Run Code Online (Sandbox Code Playgroud)
不幸的是,它仍然返回"无法解决thread.lock对象"错误.
从根本上来说,happybase 连接只是 tcp 连接,因此它们不能在进程之间共享。连接池主要对多线程应用程序有用,而且对单线程应用程序也很有用,可以将池用作具有连接重用的全局“连接工厂”,这可以简化代码,因为不需要传递“连接”对象大约。它还使错误恢复变得更容易一些。
在任何情况下,池(只是一组连接)都不能在进程之间共享。因此尝试将其序列化是没有意义的。(池使用锁会导致序列化失败,但这只是一个症状。)
也许您可以使用一个助手来有条件地创建一个池(或连接)并将其存储为模块局部变量,而不是在导入时实例化它,例如
_pool = None
def get_pool():
global _pool
if _pool is None:
_pool = happybase.ConnectionPool(size=1, host=[hbase_master])
return pool
def process(...)
with get_pool().connection() as connection:
connection.table(...).put(...)
Run Code Online (Sandbox Code Playgroud)
这会在第一次使用时实例化池/连接,而不是在导入时实例化。
归档时间: |
|
查看次数: |
2057 次 |
最近记录: |