标签: happybase

使用HappyBase连接池的PySpark dataframe.foreach()返回'TypeError:无法pickle thread.lock对象'

我有一个PySpark作业,用于更新HBase中的一些对象(Spark v1.6.0; happybase v0.9).

如果我打开/关闭每行的HBase连接,它会有效:

def process_row(row):
    conn = happybase.Connection(host=[hbase_master])
    # update HBase record with data from row
    conn.close()

my_dataframe.foreach(process_row)
Run Code Online (Sandbox Code Playgroud)

几千次upserts后,我们开始看到这样的错误:

TTransportException: Could not connect to [hbase_master]:9090
Run Code Online (Sandbox Code Playgroud)

显然,为每个upsert打开/关闭连接效率很低.这个函数实际上只是一个适当解决方案的占位符.

然后我尝试创建一个process_row使用连接池的函数版本:

pool = happybase.ConnectionPool(size=20, host=[hbase_master])

def process_row(row):
    with pool.connection() as conn:
        # update HBase record with data from row
Run Code Online (Sandbox Code Playgroud)

由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):

TypeError: can't pickle thread.lock objects
Run Code Online (Sandbox Code Playgroud)

你能看出我做错了什么吗?

更新

我看到这篇文章并怀疑我遇到了同样的问题:Spark尝试序列化pool对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享.

听起来我需要将数据集拆分为分区,并且每个分区使用一个连接(请参阅使用foreachrdd的设计模式).我根据文档中的示例尝试了这个:

def persist_to_hbase(dataframe_partition):
    hbase_connection = happybase.Connection(host=[hbase_master])
    for row in dataframe_partition:
        # persist data
    hbase_connection.close() …
Run Code Online (Sandbox Code Playgroud)

python happybase apache-spark pyspark

10
推荐指数
1
解决办法
2057
查看次数

Spark无法pickle method_descriptor

我收到这个奇怪的错误消息

15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
  File "/home/user/inverted-index.py", line 78, in <module>
    print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
  File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
    return self.mapPartitions(topIterator).reduce(merge)
  File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
    vals = self.mapPartitions(func).collect()
  File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
    pickled_command = ser.dumps(command)
  File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
    return cloudpickle.dumps(obj, 2)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
    cp.dump(obj)
  File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, …
Run Code Online (Sandbox Code Playgroud)

python hbase pickle happybase apache-spark

7
推荐指数
1
解决办法
4692
查看次数

如何处理BigTable Scan InvalidChunk异常?

我试图扫描BigTable数据,其中一些行是"脏" - 但这取决于扫描失败,导致(序列化?)InvalidChunk异常.代码如下:

from google.cloud import bigtable
from google.cloud import happybase
client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(instance_id)
connection = happybase.Connection(instance=instance)
table = connection.table(table_name)

for key, row in table.scan(limit=5000):  #BOOM!
    pass
Run Code Online (Sandbox Code Playgroud)

省略一些列或将行限制为更少或指定开始和停止键,允许扫描成功.我无法从堆栈跟踪中检测出哪些值存在问题 - 它在列之间变化 - 扫描失败.这使得在源头清理数据成为问题.

当我利用python调试器时,我看到chunk(类型为google.bigtable.v2.bigtable_pb2.CellChunk)没有值(它是NULL/undefined):

ipdb> pp chunk.value
b''
ipdb> chunk.value_size
0
Run Code Online (Sandbox Code Playgroud)

我可以使用rowkey中的HBase shell来确认这一点(我从self._row.row_key获得)

所以问题就变成了:BigTable如何扫描过滤掉具有未定义/空/空值的列?

我从两个谷歌云API中得到同样的问题,这些API返回生成器,这些生成器在内部将数据作为块通过gRPC传输:

  • google.cloud.happybase .table.Table# 扫描()
  • google.cloud.bigtable .table.Table #read_rows().consume_all()

缩写的stacktrace如下:

---------------------------------------------------------------------------
InvalidChunk                              Traceback (most recent call last)
<ipython-input-48-922c8127f43b> in <module>()
      1 row_gen …
Run Code Online (Sandbox Code Playgroud)

python happybase google-cloud-bigtable

6
推荐指数
1
解决办法
222
查看次数

Happybase与hbase连接时出错

>>>import happybase
>>>cnx=happybase.Connection('localhost')

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/seyf/envname/local/lib/python2.7/site-packages/happybase/api.py", line 121, in __init__
self.open()
File "/home/seyf/envname/local/lib/python2.7/site-packages/happybase/api.py", line 138, in open
self.transport.open()
File "/home/seyf/envname/local/lib/python2.7/site-packages/thrift/transport/TTransport.py", line 149, in open
return self.__trans.open()
File "/home/seyf/envname/local/lib/python2.7/site-packages/thrift/transport/TSocket.py", line 99, in open
message=message)
thrift.transport.TTransport.TTransportException: Could not connect to localhost:9090
Run Code Online (Sandbox Code Playgroud)

python hbase thrift happybase

4
推荐指数
1
解决办法
2636
查看次数

使用 python 连接到 Hbase 失败

我正在尝试使用 python 示例代码连接到 Hbase

import happybase
connection = happybase.Connection(myhost,port, autoconnect=True)

# before first use:
connection.open()
print(connection.tables())
Run Code Online (Sandbox Code Playgroud)

这是给出如下错误

打印(connection.tables())回溯(最近一次调用最后一次):文件“”,第1行,在文件“/usr/local/lib/python2.7/dist-packages/happybase/connection.py”,第242行, 在表中名称 = self.client.getTableNames() 文件“/usr/local/lib/python2.7/dist-packages/thriftpy/thrift.py”,第 198 行,在 _req 中返回 self._recv(_api) 文件“ /usr/local/lib/python2.7/dist-packages/thriftpy/thrift.py”,第 210 行,在 _recv fname, mtype, rseqid = self._iprot.read_message_begin() 文件“thriftpy/protocol/cybin/cybin. pyx", line 439, in cybin.TCyBinaryProtocol.read_message_begin (thriftpy/protocol/cybin/cybin.c:6470) cybin.ProtocolError: No protocol version header

操作系统:Ubuntu 16.04 我使用的是 python 2.7 Hbase 版本 1.1 帮助我理解这个问题。除了happybase 有没有更好的方法连接到Hbase

谢谢

python hadoop hbase happybase

4
推荐指数
1
解决办法
3864
查看次数

从python(happybase)写入hbase表

我正在运行map-reduce作业,现在我想在hbase中输入值.我通过stdin从map-reduce作业中传输值,并有一个python脚本,在happybase上插入(放置)行.

我遇到了各种各样的问题,从python执行put.根据我的理解,最近的问题似乎与库兼容性问题有关.错误日志显示iteritems的问题.该happybase手册是指对排序的查询需要额外的Python库,这是没有必要从Python版本2.7(我运行2.7.6)开始.

有没有人遇到类似的问题?它们可以轻松修复,还是建议使用不同的界面?

更多细节

我在独立配置中安装并运行了hadoop(2.6.0)和hbase(0.98.10 - 2/5/2015).他们开始了.我可以通过shell与hbase接口,创建表,输入值并扫描它们.

我可以通过happybase扫描并打印python中的表格,至少显示连接是否有效.但总是失败.这个简短的例子说明了问题:

为了这个例子,我的表叫做test(在hbase shell中创建).它有一列f1.

hbase(main)> create 't1','f1'
hbase(main)> put 't1','1','f1','hello'
Run Code Online (Sandbox Code Playgroud)

现在python:

>>> import happybase
>>> connection = happybase.Connection('localhost')
>>> table = connection.table('t1')
>>> print(table.row('1')) # {'f1:': 'hello'}
>>> table.put('2',{'f1','hey'}) # fails, see log
Run Code Online (Sandbox Code Playgroud)

更多细节:

节俭正在运行.

# hbase thrift start -threadpool


hduser@box> hbase -version
Run Code Online (Sandbox Code Playgroud)

java版"1.8.0_31"Java(TM)SE运行时环境(版本1.8.0_31-b13)Java HotSpot(TM)64位服务器VM(版本25.31-b07,混合模式)

错误日志:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-15-56dab4cd31ef> in <module>()
----> 1 table.put('2',{'f1','hey'})

/usr/local/lib/python2.7/dist-packages/happybase/table.pyc in put(self, row, data, timestamp, wal) …
Run Code Online (Sandbox Code Playgroud)

python hadoop hbase thrift happybase

2
推荐指数
1
解决办法
7178
查看次数

Cloudera/CDH v6.1.x + Python HappyBase v1.1.0: TTransportException(type=4, message='TSocket 读取 0 字节')

编辑:此问题和答案适用于遇到主题行中所述异常的任何人:TTransportException(type=4, message='TSocket read 0 bytes');无论是否涉及 Cloudera 和/或 HappyBase。

根本问题(事实证明)源于正在实现的内容的不匹配和protocol/或transport格式,并且任何客户端/服务器配对都可能发生这种情况。我的恰好是Cloudera 和 HappyBase,但你的不一定是,你也可能会遇到同样的问题。client-sideserver-side

最近有没有人尝试过使用Python 包与onhappybase v1.1.0 (latest)进行交互?HbaseCloudera CDH v6.1.x

我正在尝试各种选项,但不断出现异常:

thriftpy.transport.TTransportException:
TTransportException(type=4, message='TSocket read 0 bytes')
Run Code Online (Sandbox Code Playgroud)

以下是我如何启动会话并提交一个简单的调用来获取表列表(使用Python v3.6.7

import happybase

CDH6_HBASE_THRIFT_VER='0.92'

hbase_cnxn = happybase.Connection(
    host='vps00', port=9090,
    table_prefix=None,
    compat=CDH6_HBASE_THRIFT_VER,
    table_prefix_separator=b'_',
    timeout=None,
    autoconnect=True,
    transport='buffered',
    protocol='binary'
)

print('tables:', hbase_cnxn.tables()) # Exception happens here.
Run Code Online (Sandbox Code Playgroud)

以下是Cloudera CDH v6.1.x启动Hbase Thrift服务器的方式(为简洁起见,进行了截断):

/usr/java/jdk1.8.0_141-cloudera/bin/java [... snip ... ] \
    org.apache.hadoop.hbase.thrift.ThriftServer start …
Run Code Online (Sandbox Code Playgroud)

python hbase thrift happybase cloudera-cdh

1
推荐指数
1
解决办法
2816
查看次数