我正在编写一个用于序列化和发送protobuf3消息的 Python 应用程序。我想制作某种交互式用户界面,允许选择消息并动态分配它。我有一大堆这样的消息,因此我不想get为每条消息创建一个函数,而是创建一个可以处理所有消息的函数。
对于get所有消息字段,我可以简单地获取消息的所有属性并选择其字段,这很容易。然后,为了知道属性是什么类型,我使用type(getattr(my_message, current_field)). 现在问题来了。假设这些是我的消息:
message myMess1 {
//some fields
}
message myMess2 {
string some_string = 1
repeated myMess1 myMess1Field = 2
}
Run Code Online (Sandbox Code Playgroud)
现在,分配 some_string 字段就没有问题了。
type(getattr(myMess2Instance, someStringFieldName))return string,所以我知道用字符串来填充它。
但是重复的 myMess1 字段该怎么办呢?
type(getattr(MyMess2Instance, myMess1FieldName))实际上返回google.protobuf.pyext._message.RepeatedCompositeContainer,它没有说明其中包含什么类型。我怎么才能得到它?
我正在编写 python 上下文管理器,它运行异步任务。如果我的经理的任何任务抛出异常,我希望我的经理终止。这是示例代码:
class MyClass:
def __init__(self):
if asyncio.get_event_loop().is_closed():
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
def __enter__(self):
return self
def __exit__(self, excType, excValue, tb):
try:
self.loop.run_until_complete(self._exit_loop())
finally:
self.loop.close()
if excType is not None:
print(excType.__name__, ':', excValue)
traceback.print_tb(tb)
async def _exit_loop(self):
tasks = [task for task in asyncio.all_tasks(self.loop) if
task is not asyncio.current_task(self.loop)]
list(map(lambda task: task.cancel(), tasks))
results = await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
async def func1(self):
while True:
print('func1')
await asyncio.sleep(1)
async def func2(self):
i = 5
while i > 0:
print('func2')
await …Run Code Online (Sandbox Code Playgroud)