for post in db.datasets.find({"test_set":"abc"}).sort("abc",pymongo.DESCENDING).skip((page-1)*num).limit(num):
Run Code Online (Sandbox Code Playgroud)
这是我目前的代码.
我如何得到计数()?
当我在 Python 3.7 中运行此代码时:
import asyncio
sem = asyncio.Semaphore(2)
async def work():
async with sem:
print('working')
await asyncio.sleep(1)
async def main():
await asyncio.gather(work(), work(), work())
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
它因运行时错误而失败:
$ python3 demo.py
working
working
Traceback (most recent call last):
File "demo.py", line 13, in <module>
asyncio.run(main())
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "demo.py", line 11, in main
await asyncio.gather(work(), work(), work())
File "demo.py", line 6, in work
async with sem:
File …Run Code Online (Sandbox Code Playgroud) 我的MongoDB副本集备份有一个奇怪的问题.我有2台服务器(一台主服务器和一台辅助服务器),我每2小时运行一次备份任务.
我用这个: mongodump.exe --db MyBase --out "d:\Backups"
但是,当我运行此备份时,我的客户端应用程序(c#)会抛出这样的错误:
MongoDB.Driver.MongoConnectionException:无法连接到与读取首选项主要匹配的副本集的成员
我认为mongodump在客户端应用程序上没有这样的影响.所以,那就是说.
我想仅在辅助服务器上强制执行备份操作.我该怎么办?运行的命令是什么?
谢谢您的帮助.
我正在尝试学习如何使用 Python 的 asyncio 模块同时运行任务。在下面的代码中,我有一个模拟的“网络爬虫”作为示例。基本上,我试图让它在任何给定时间发生最多两个活动的 fetch() 请求,并且我希望在 sleep() 期间调用 process() 。
import asyncio
class Crawler():
urlq = ['http://www.google.com', 'http://www.yahoo.com',
'http://www.cnn.com', 'http://www.gamespot.com',
'http://www.facebook.com', 'http://www.evergreen.edu']
htmlq = []
MAX_ACTIVE_FETCHES = 2
active_fetches = 0
def __init__(self):
pass
async def fetch(self, url):
self.active_fetches += 1
print("Fetching URL: " + url);
await(asyncio.sleep(2))
self.active_fetches -= 1
self.htmlq.append(url)
async def crawl(self):
while self.active_fetches < self.MAX_ACTIVE_FETCHES:
if self.urlq:
url = self.urlq.pop()
task = asyncio.create_task(self.fetch(url))
await task
else:
print("URL queue empty")
break;
def process(self, page):
print("processed page: " …Run Code Online (Sandbox Code Playgroud) 我的代码conftest.py如下:
def pytest_collection_modifyitems(config, items):
items.sort(key=lambda x: 2 if x.get_marker('slow') else 1)
Run Code Online (Sandbox Code Playgroud)
最近,它开始引起以下异常:
$ venv/bin/py.test -vv --tb=short tests
============================================================================ test session starts ============================================================================
platform darwin -- Python 3.5.6, pytest-4.1.1, py-1.7.0, pluggy-0.8.1 -- /Users/.../venv/bin/python3.5
cachedir: .pytest_cache
rootdir: /Users/..., inifile:
collecting ... INTERNALERROR> Traceback (most recent call last):
INTERNALERROR> File "/Users/.../venv/lib/python3.5/site-packages/_pytest/main.py", line 203, in wrap_session
...
INTERNALERROR> File "/Users/.../venv/lib/python3.5/site-packages/pluggy/callers.py", line 187, in _multicall
INTERNALERROR> res = hook_impl.function(*args)
INTERNALERROR> File "/Users/.../tests/conftest.py", line 14, in pytest_collection_modifyitems
INTERNALERROR> items.sort(key=lambda x: 2 if x.get_marker('slow') else …Run Code Online (Sandbox Code Playgroud) 我有这个代码:
done, pending = asyncio.wait(
[a, b],
return_when=asyncio.FIRST_COMPLETED)
Run Code Online (Sandbox Code Playgroud)
但它失败了:
Traceback (most recent call last):
...
File "/.../api.py", line 83, in websockets_handler
return_when=asyncio.FIRST_COMPLETED)
TypeError: cannot unpack non-iterable coroutine object
Run Code Online (Sandbox Code Playgroud) 想象一下,我有一个基于 Aiohttp 的 Web 应用程序:
from aiohttp import web
import asyncio
import logging
logger = logging.getLogger(__name__)
async def hello(request):
logger.info('Started processing request')
await asyncio.sleep(1)
logger.info('Doing something')
await asyncio.sleep(1)
return web.Response(text="Hello, world!\n")
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(name)-14s %(levelname)s: %(message)s')
app = web.Application()
app.add_routes([web.get('/', hello)])
web.run_app(app)
Run Code Online (Sandbox Code Playgroud)
它的输出是(例如):
2019-11-11 13:37:14,757 __main__ INFO: Started processing request
2019-11-11 13:37:14,757 __main__ INFO: Started processing request
2019-11-11 13:37:15,761 __main__ INFO: Doing something
2019-11-11 13:37:15,761 __main__ INFO: Doing something
2019-11-11 13:37:16,765 aiohttp.access INFO: 127.0.0.1 [11/Nov/2019:12:37:14 +0000] "GET / HTTP/1.1" …Run Code Online (Sandbox Code Playgroud) 我找不到使用 aiohttp 发送用户凭据的方法。我想要与 cURL 类似的行为
curl --user "USER:PASSWORD"
Run Code Online (Sandbox Code Playgroud)
但在 aiohttp 中。在参考文档中我找不到这个选项,我可以找到查询参数、标题、正文,但不能找到用户凭据。
我使用 aiohttp 而不是 curl 来实现异步行为。
将异步用于套接字服务器的推荐方法是:
import asyncio
async def handle_client(reader, writer):
request = (await reader.read(100)).decode()
response = "Data received."
writer.write(response.encode())
async def main():
loop.create_task(asyncio.start_server(handle_client, 'localhost', 15555))
loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
这可以正常工作,但是现在我需要接收适当的客户端请求,然后使用aiohttp库从第三方的Restful API获取数据。
这需要创建一个会话变量,如下所示:
from aiohttp import ClientSession
session = ClientSession()
Run Code Online (Sandbox Code Playgroud)
但这也应该在协程本身内部,因此我将其放入main中:
async def main():
session = ClientSession()
loop.create_task(asyncio.start_server(handle_client, '', 55555))
Run Code Online (Sandbox Code Playgroud)
现在,我需要将会话变量传递给aiohttp get coroutine来获取其余的API数据:
async with session.get(url, params=params) as r:
try:
return await r.json(content_type='application/json')
except aiohttp.client_exceptions.ClientResponseError:
....
Run Code Online (Sandbox Code Playgroud)
我的问题是,如果它坚持只具有读取器,写入器参数,而全局变量对协程内部必须存在会话却没有帮助,那么如何将会话变量传递给handle_client协程?
python ×8
aiohttp ×4
mongodb ×2
python-3.x ×2
asynchronous ×1
concurrency ×1
count ×1
curl ×1
database ×1
logging ×1
mongodump ×1
pymongo ×1
pytest ×1
replicaset ×1
semaphore ×1
task ×1