Vim似乎没有正确地对YAML文件中的破折号进行反应,因此打破了格式.
例如,我有一个块应该是这样的:
handlers:
- name: restart exim4
service: name=exim4 state=restarted
Run Code Online (Sandbox Code Playgroud)
当我完成输入restart exim4并输入service:Vim reindents时,我的最后service一行:
handlers:
- name: restart exim4
service: name=exim4 state=restarted
Run Code Online (Sandbox Code Playgroud)
很明显,Vim尝试按列排列句子,但这不是YAML所需要的.我想创建一个包含两个值的数组.
如何解决?
测试功能我需要传递参数并查看输出与预期输出匹配.
当函数的响应只是一个小数组或可以在测试函数中定义的单行字符串时很容易,但假设我测试的函数修改了一个可能很大的配置文件.或者,如果我明确定义它,结果数组就会长4行.我在哪里存储,以便我的测试保持干净,易于维护?
现在如果那是字符串我只是在.py测试附近放一个文件并open()在测试中进行:
def test_if_it_works():
with open('expected_asnwer_from_some_function.txt') as res_file:
expected_data = res_file.read()
input_data = ... # Maybe loaded from a file as well
assert expected_data == if_it_works(input_data)
Run Code Online (Sandbox Code Playgroud)
我发现这种方法存在许多问题,比如维护此文件的最新问题.它看起来也很糟糕.我可以把事情变得更好:
@pytest.fixture
def expected_data()
with open('expected_asnwer_from_some_function.txt') as res_file:
expected_data = res_file.read()
return expected_data
@pytest.fixture
def input_data()
return '1,2,3,4'
def test_if_it_works(input_data, expected_data):
assert expected_data == if_it_works(input_data)
Run Code Online (Sandbox Code Playgroud)
这只是将问题移到另一个地方,通常我需要测试函数是否在空输入,输入单个项目或多个项目的情况下工作,所以我应该创建一个大的夹具,包括所有三个案例或多个灯具.最后代码变得相当混乱.
如果一个函数需要一个复杂的字典作为输入或者给出相同大小的字典测试代码变得丑陋:
@pytest.fixture
def input_data():
# It's just an example
return {['one_value': 3, 'one_value': 3, 'one_value': 3,
'anotherky': 3, 'somedata': 'somestring'],
['login': 3, 'ip_address': …Run Code Online (Sandbox Code Playgroud) 假设项目中有两个包:some_package和another_package.
# some_package/foo.py:
def bar():
print('hello')
Run Code Online (Sandbox Code Playgroud)
# another_package/function.py
from some_package.foo import bar
def call_bar():
# ... code ...
bar()
# ... code ...
Run Code Online (Sandbox Code Playgroud)
我想测试another_package.function.call_bar模拟,some_package.foo.bar因为它有一些I/OI希望避免的网络.
这是一个测试:
# tests/test_bar.py
from another_package.function import call_bar
def test_bar(monkeypatch):
monkeypatch.setattr('some_package.foo.bar', lambda: print('patched'))
call_bar()
assert True
Run Code Online (Sandbox Code Playgroud)
令我惊讶的是它输出hello而不是patched.我尝试调试这个东西,在测试中放置一个IPDB断点.当我some_package.foo.bar在断点后手动导入并调用bar()我得到patched.
在我的真实项目中,情况更加有趣.如果我在项目根目录中调用pytest,我的函数不会被修补,但是当我指定tests/test_bar.py为参数时 - 它可以工作.
据我所知,它与from some_package.foo import bar声明有关.如果在monkeypatching发生之前执行它,那么修补失败.但是在上面的例子中的压缩测试设置中,修补在两种情况下都不起作用.
为什么在遇到断点后它在IPDB REPL中工作?
为了执行测试,我通常运行一个单独的容器:
docker-compose run --rm web /bin/bash
Run Code Online (Sandbox Code Playgroud)
web是django的容器.从shell我不时执行py.test.
为了能够从带有django的容器到达selenium并允许浏览器从selenium容器到达django的liveserver,我决定使用"net"参数,允许容器共享网络.所以我把它添加到yml:
selenium:
image: selenium/standalone-firefox
net: "container:web"
Run Code Online (Sandbox Code Playgroud)
不幸的是,这不起作用.我在django容器中看不到4444端口.
它只适用于net:"container:web"我指定自动生成容器的名称,而不是net:"container:project_web_run_1".
此外,我尝试而不是docker-compose run --rm ....使用docker-compose up --no-deps更改command参数,py.test functional_tests但这也不起作用.
这是将硒与容器一起使用的权利吗?
之间有什么区别:
r = group(some_task.s(i) for i in range(10)).apply_async()
result = r.join()
Run Code Online (Sandbox Code Playgroud)
和:
r = group(some_task.s(i) for i in range(10))()
result = r.get()
Run Code Online (Sandbox Code Playgroud)
芹菜文档使用两个示例,我没有看到任何差异.
我正在使用python3 + falcon组合编写API.
在我可以向客户端发送回复的方法中有很多地方,但由于一些繁重的代码执行DB,i/o操作等,它必须等到重要部分结束.
例如:
class APIHandler:
def on_get(self, req, resp):
response = "Hello"
#Some heavy code
resp.body(response)
Run Code Online (Sandbox Code Playgroud)
我可以在第一行代码发送"Hello".我想要的是在后台运行繁重的代码并发送响应,无论重型部件何时完成.
Falcon没有任何内置的异步功能,但他们提到它可以像gevent一样使用.我还没有找到任何关于如何将这两者结合起来的文档.
我有一个长期运行的芹菜任务,它迭代一系列项目并执行一些操作.
任务应该以某种方式报告它当前处理的项目,以便最终用户知道任务的进度.
目前我的django应用程序和芹菜一起安装在一台服务器上,所以我可以使用Django的模型来报告状态,但我计划添加更多远离Django的工作人员,因此他们无法访问数据库.
现在我看到几个解决方案:
PUT http://django.com/api/task/123/items_processedItem processeddjango更新计数器的事件items proceeded计数的任务,所以当任务完成时它会发出一个项目increase_messages_proceeded_count.delay(task_id).我提到的那些是否有任何解决方案或隐藏的问题?
据我从有关 pytest 夹具参数化的文档中了解到 - 它使用给定的参数创建夹具的副本,从而调用每个需要具有不同副本的夹具的测试。
我的需求有点不同。假设有一个夹具:
@pytest.fixture
def sample_foo():
return Foo(file_path='file/path/test.json')
def test1(sample_foo):
pass
def test2(sample_foo):
pass
Run Code Online (Sandbox Code Playgroud)
问题是 testtest1并test2需要相同的类实例Foo但具有不同的值file_path
所以目前我这样做:
def build_foo(path):
return Foo(file_path=path)
def test1():
sample_foo = build_foo('file/path/some.json')
def test2():
sample_foo = build_foo('file/path/another.json')
Run Code Online (Sandbox Code Playgroud)
这看起来有点像代码重复。我可以为每个文件创建一个单独的装置,但这看起来更糟。看起来每个测试都需要它自己的唯一文件,所以也许可以通过查看请求夹具的测试函数的名称来找出文件的名称来解决这个问题。但这并不能保证。
我有一个 python 应用程序,用户可以在其中启动某个任务。
任务的全部目的也是执行给定数量的 POST/GET 请求,并以特定的时间间隔到给定的 URL。
所以用户给出 N - 请求数,V - 每秒请求数。
考虑到由于 I/O 延迟,实际 r/s 速度可能更大或更小,设计这样的任务如何更好。
首先,我决定将 Celery 与 Eventlet 一起使用,否则我将需要大量无法接受的作品。
我天真的方法:
内部任务我做这样的事情:
@task
def task(number_of_requests, time_period):
for _ in range(number_of_requests):
start = time.time()
params_for_concrete_subtask = ...
# .... do some IO with monkey_patched eventlet requests library
elapsed = (time.time() - start)
# If we completed this subtask to fast
if elapsed < time_period / number_of_requests:
eventlet.sleep(time_period / number_of_requests)
Run Code Online (Sandbox Code Playgroud)一个工作示例是here。
如果我们太快,我们会尝试等待以保持所需的速度。如果我们太慢,从客户的角度来看是可以的。我们不违反请求/第二个要求。但是,如果我重新启动 Celery,这会正确恢复吗?
我认为这应该有效,但我认为有更好的方法。在 …
我使用aioredis编写了异步服务,该服务将在某个频道上侦听并以异步方式运行一些命令。
基本上,我从示例页面中获取了一个代码来编写一个小型测试应用程序,并删除了不必要的部分:
import asyncio
import aioredis
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Run Code Online (Sandbox Code Playgroud)
还有另一个测试应用程序,它发布带有sleep_for字段的json blob ,然后在订阅者应用程序中使用该字段reader使用sleep语句来模拟协程内部的某些工作。
我希望“睡眠”以“并行”方式运行,但实际上它们以同步的方式出现在屏幕上,一个接一个。
我的猜测是,只要打到await ch.get_json(..)(或什至await ch.wait_message())行,我就应该能够处理下一条消息。在实践中,它像同步代码一样运行。我哪里错了?这可以使用连接池来处理,但这意味着存在一些不异步的问题,也不知道确切的含义。
python ×8
celery ×3
pytest ×3
django ×2
asynchronous ×1
selenium ×1
unit-testing ×1
vim ×1
yaml ×1