我有一个PySpark作业,用于更新HBase中的一些对象(Spark v1.6.0; happybase v0.9).
如果我打开/关闭每行的HBase连接,它会有效:
def process_row(row):
conn = happybase.Connection(host=[hbase_master])
# update HBase record with data from row
conn.close()
my_dataframe.foreach(process_row)
Run Code Online (Sandbox Code Playgroud)
几千次upserts后,我们开始看到这样的错误:
Run Code Online (Sandbox Code Playgroud)TTransportException: Could not connect to [hbase_master]:9090
显然,为每个upsert打开/关闭连接效率很低.这个函数实际上只是一个适当解决方案的占位符.
然后我尝试创建一个process_row使用连接池的函数版本:
pool = happybase.ConnectionPool(size=20, host=[hbase_master])
def process_row(row):
with pool.connection() as conn:
# update HBase record with data from row
Run Code Online (Sandbox Code Playgroud)
由于某种原因,此函数的连接池版本返回错误(请参阅完整的错误消息):
Run Code Online (Sandbox Code Playgroud)TypeError: can't pickle thread.lock objects
你能看出我做错了什么吗?
我看到这篇文章并怀疑我遇到了同样的问题:Spark尝试序列化pool对象并将其分发给每个执行程序,但是这个连接池对象不能在多个执行程序之间共享.
听起来我需要将数据集拆分为分区,并且每个分区使用一个连接(请参阅使用foreachrdd的设计模式).我根据文档中的示例尝试了这个:
def persist_to_hbase(dataframe_partition):
hbase_connection = happybase.Connection(host=[hbase_master])
for row in dataframe_partition:
# persist data
hbase_connection.close() …Run Code Online (Sandbox Code Playgroud) 我收到这个奇怪的错误消息
15/01/26 13:05:12 INFO spark.SparkContext: Created broadcast 0 from wholeTextFiles at NativeMethodAccessorImpl.java:-2
Traceback (most recent call last):
File "/home/user/inverted-index.py", line 78, in <module>
print sc.wholeTextFiles(data_dir).flatMap(update).top(10)#groupByKey().map(store)
File "/home/user/spark2/python/pyspark/rdd.py", line 1045, in top
return self.mapPartitions(topIterator).reduce(merge)
File "/home/user/spark2/python/pyspark/rdd.py", line 715, in reduce
vals = self.mapPartitions(func).collect()
File "/home/user/spark2/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
File "/home/user/spark2/python/pyspark/rdd.py", line 2107, in _jrdd
pickled_command = ser.dumps(command)
File "/home/user/spark2/python/pyspark/serializers.py", line 402, in dumps
return cloudpickle.dumps(obj, 2)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 816, in dumps
cp.dump(obj)
File "/home/user/spark2/python/pyspark/cloudpickle.py", line 133, …Run Code Online (Sandbox Code Playgroud) 我试图扫描BigTable数据,其中一些行是"脏" - 但这取决于扫描失败,导致(序列化?)InvalidChunk异常.代码如下:
from google.cloud import bigtable
from google.cloud import happybase
client = bigtable.Client(project=project_id, admin=True)
instance = client.instance(instance_id)
connection = happybase.Connection(instance=instance)
table = connection.table(table_name)
for key, row in table.scan(limit=5000): #BOOM!
pass
Run Code Online (Sandbox Code Playgroud)
省略一些列或将行限制为更少或指定开始和停止键,允许扫描成功.我无法从堆栈跟踪中检测出哪些值存在问题 - 它在列之间变化 - 扫描失败.这使得在源头清理数据成为问题.
当我利用python调试器时,我看到chunk(类型为google.bigtable.v2.bigtable_pb2.CellChunk)没有值(它是NULL/undefined):
ipdb> pp chunk.value
b''
ipdb> chunk.value_size
0
Run Code Online (Sandbox Code Playgroud)
我可以使用rowkey中的HBase shell来确认这一点(我从self._row.row_key获得)
所以问题就变成了:BigTable如何扫描过滤掉具有未定义/空/空值的列?
我从两个谷歌云API中得到同样的问题,这些API返回生成器,这些生成器在内部将数据作为块通过gRPC传输:
缩写的stacktrace如下:
---------------------------------------------------------------------------
InvalidChunk Traceback (most recent call last)
<ipython-input-48-922c8127f43b> in <module>()
1 row_gen …Run Code Online (Sandbox Code Playgroud) >>>import happybase
>>>cnx=happybase.Connection('localhost')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/seyf/envname/local/lib/python2.7/site-packages/happybase/api.py", line 121, in __init__
self.open()
File "/home/seyf/envname/local/lib/python2.7/site-packages/happybase/api.py", line 138, in open
self.transport.open()
File "/home/seyf/envname/local/lib/python2.7/site-packages/thrift/transport/TTransport.py", line 149, in open
return self.__trans.open()
File "/home/seyf/envname/local/lib/python2.7/site-packages/thrift/transport/TSocket.py", line 99, in open
message=message)
thrift.transport.TTransport.TTransportException: Could not connect to localhost:9090
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 python 示例代码连接到 Hbase
import happybase
connection = happybase.Connection(myhost,port, autoconnect=True)
# before first use:
connection.open()
print(connection.tables())
Run Code Online (Sandbox Code Playgroud)
这是给出如下错误
打印(connection.tables())回溯(最近一次调用最后一次):文件“”,第1行,在文件“/usr/local/lib/python2.7/dist-packages/happybase/connection.py”,第242行, 在表中名称 = self.client.getTableNames() 文件“/usr/local/lib/python2.7/dist-packages/thriftpy/thrift.py”,第 198 行,在 _req 中返回 self._recv(_api) 文件“ /usr/local/lib/python2.7/dist-packages/thriftpy/thrift.py”,第 210 行,在 _recv fname, mtype, rseqid = self._iprot.read_message_begin() 文件“thriftpy/protocol/cybin/cybin. pyx", line 439, in cybin.TCyBinaryProtocol.read_message_begin (thriftpy/protocol/cybin/cybin.c:6470) cybin.ProtocolError: No protocol version header
操作系统:Ubuntu 16.04 我使用的是 python 2.7 Hbase 版本 1.1 帮助我理解这个问题。除了happybase 有没有更好的方法连接到Hbase
谢谢
我正在运行map-reduce作业,现在我想在hbase中输入值.我通过stdin从map-reduce作业中传输值,并有一个python脚本,在happybase上插入(放置)行.
我遇到了各种各样的问题,从python执行put.根据我的理解,最近的问题似乎与库兼容性问题有关.错误日志显示iteritems的问题.该happybase手册是指对排序的查询需要额外的Python库,这是没有必要从Python版本2.7(我运行2.7.6)开始.
有没有人遇到类似的问题?它们可以轻松修复,还是建议使用不同的界面?
更多细节
我在独立配置中安装并运行了hadoop(2.6.0)和hbase(0.98.10 - 2/5/2015).他们开始了.我可以通过shell与hbase接口,创建表,输入值并扫描它们.
我可以通过happybase扫描并打印python中的表格,至少显示连接是否有效.但总是失败.这个简短的例子说明了问题:
为了这个例子,我的表叫做test(在hbase shell中创建).它有一列f1.
hbase(main)> create 't1','f1'
hbase(main)> put 't1','1','f1','hello'
Run Code Online (Sandbox Code Playgroud)
现在python:
>>> import happybase
>>> connection = happybase.Connection('localhost')
>>> table = connection.table('t1')
>>> print(table.row('1')) # {'f1:': 'hello'}
>>> table.put('2',{'f1','hey'}) # fails, see log
Run Code Online (Sandbox Code Playgroud)
更多细节:
节俭正在运行.
# hbase thrift start -threadpool
hduser@box> hbase -version
Run Code Online (Sandbox Code Playgroud)
java版"1.8.0_31"Java(TM)SE运行时环境(版本1.8.0_31-b13)Java HotSpot(TM)64位服务器VM(版本25.31-b07,混合模式)
错误日志:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-15-56dab4cd31ef> in <module>()
----> 1 table.put('2',{'f1','hey'})
/usr/local/lib/python2.7/dist-packages/happybase/table.pyc in put(self, row, data, timestamp, wal) …Run Code Online (Sandbox Code Playgroud) 编辑:此问题和答案适用于遇到主题行中所述异常的任何人:TTransportException(type=4, message='TSocket read 0 bytes');无论是否涉及 Cloudera 和/或 HappyBase。
根本问题(事实证明)源于正在实现的内容的不匹配和
protocol/或transport格式,并且任何客户端/服务器配对都可能发生这种情况。我的恰好是Cloudera 和 HappyBase,但你的不一定是,你也可能会遇到同样的问题。client-sideserver-side
最近有没有人尝试过使用Python 包与onhappybase v1.1.0 (latest)进行交互?HbaseCloudera CDH v6.1.x
我正在尝试各种选项,但不断出现异常:
thriftpy.transport.TTransportException:
TTransportException(type=4, message='TSocket read 0 bytes')
Run Code Online (Sandbox Code Playgroud)
以下是我如何启动会话并提交一个简单的调用来获取表列表(使用Python v3.6.7:
import happybase
CDH6_HBASE_THRIFT_VER='0.92'
hbase_cnxn = happybase.Connection(
host='vps00', port=9090,
table_prefix=None,
compat=CDH6_HBASE_THRIFT_VER,
table_prefix_separator=b'_',
timeout=None,
autoconnect=True,
transport='buffered',
protocol='binary'
)
print('tables:', hbase_cnxn.tables()) # Exception happens here.
Run Code Online (Sandbox Code Playgroud)
以下是Cloudera CDH v6.1.x启动Hbase Thrift服务器的方式(为简洁起见,进行了截断):
/usr/java/jdk1.8.0_141-cloudera/bin/java [... snip ... ] \
org.apache.hadoop.hbase.thrift.ThriftServer start …Run Code Online (Sandbox Code Playgroud) happybase ×7
python ×7
hbase ×5
thrift ×3
apache-spark ×2
hadoop ×2
cloudera-cdh ×1
pickle ×1
pyspark ×1