我正在编写多进程代码,它在 Python 3.7 中完美运行。然而,我希望并行进程之一执行 IO 进程永远使用 AsyncIO 我为了获得更好的性能,但一直无法让它运行。
Ubuntu 18.04、Python 3.7、AsyncIO、pipenv(已安装所有 pip 库)
该方法特别是使用多线程按预期运行,这是我想用 AsyncIO 替换的内容。
我已经用谷歌搜索并尝试在 main() 函数中循环,现在只在预期的 cor-routine 中,查看了示例并阅读了这种新的异步方式来解决问题,但到目前为止没有结果。
以下是运行的 app.py 代码:python app.py
import sys
import traceback
import logging
import asyncio
from config import DEBUG
from config import log_config
from <some-module> import <some-class>
if DEBUG:
logging.config.dictConfig(log_config())
else:
logging.basicConfig(
level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)
def main():
try:
<some> = <some-class>([
'some-data1.csv',
'some-data2.csv'
])
<some>.run()
except:
traceback.print_exc()
pdb.post_mortem()
sys.exit(0)
if __name__ == '__main__':
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
这是我定义给定类的代码
_sql_client = …
Run Code Online (Sandbox Code Playgroud) 我正在使用 Python 3.4.6。
这里是工厂:
def create_parser():
""" Create argument parser """
# Input configuration parameters
_parser = argparse.ArgumentParser(description='Segments Engine')
# Application only parameters
_parser.add_argument(
'-V', '--version', version='%(prog)s ' + __version__,
action='version')
_pasrser.add_argument(
'-d', '--debug', action='store_true')
return _parser
Run Code Online (Sandbox Code Playgroud)
测试用例如下:
class TestCli(unittest.TestCase):
""" Test the cli.py module """
def __init__(self, *args, **kwargs):
""" Initialize Unit Test """
super(TestCli, self).__init__(*args, **kwargs)
def setup(self):
""" Setup before each test case """
setup_config(app)
setup_logging(app)
def teardown(self):
""" Tear down after each test case """
pass …
Run Code Online (Sandbox Code Playgroud)