假设我有一个像这样的异步生成器:
async def event_publisher(connection, queue):
while True:
if not await connection.is_disconnected():
event = await queue.get()
yield event
else:
return
Run Code Online (Sandbox Code Playgroud)
我是这样消费的:
published_events = event_publisher(connection, queue)
async for event in published_events:
# do event processing here
Run Code Online (Sandbox Code Playgroud)
它工作得很好,但是当连接断开并且没有发布新事件时,async for它将永远等待,所以理想情况下,我想像这样强行关闭生成器:
if connection.is_disconnected():
await published_events.aclose()
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
RuntimeError: aclose(): asynchronous generator is already running
有没有办法停止处理已经运行的发电机?
我想创建一个函数签名,其中undefinedvalue 和None彼此不同。
所以例如:
undefined = object()
def update(id: str, title: str = undefined, description: str):
data = {
"title": title,
"description": description,
}
// remove undefined key-value pairs from data
for key in data.keys():
if data[key] is undefined:
del data[key]
# send data to the server for further processing
Run Code Online (Sandbox Code Playgroud)
显然,这样我会得到冲突的类型和以下错误:Incompatible default for argument "description" (default has type "object", argument has type "str")
从上面的例子可以看出,传递None是完全有效的,因为它将被替换为nullJSON 的值,而如果undefined传递则不会在数据中考虑该值。
我尝试定义未定义的类型,例如:
UndefinedStr = typing.Union[str, undefined] …Run Code Online (Sandbox Code Playgroud) 我需要一个 cron 作业每 5 分钟运行一次。如果较早的 cron 作业仍在运行,则不应启动另一个 cron 作业。我尝试将并发策略设置为“禁止”,但 cron 作业根本不运行。
spec:
concurrencyPolicy: Allow
schedule: '*/5 * * * *'
Run Code Online (Sandbox Code Playgroud)
spec:
concurrencyPolicy: Forbid
schedule: '*/5 * * * *'
Run Code Online (Sandbox Code Playgroud)
spec:
concurrencyPolicy: Forbid
schedule: '*/5 * * * *'
startingDeadlineSeconds: 10
Run Code Online (Sandbox Code Playgroud)
有人可以帮我吗?
我尝试了以下代码,但是当我打开 env 文件时它仍然是空的。
import os
from os.path import join, dirname
from dotenv import load_dotenv
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
os.environ['ORI'] = '123'
Run Code Online (Sandbox Code Playgroud) 我想创建一个调度程序脚本,以在一个序列中多次运行同一蜘蛛。
到目前为止,我得到了以下内容:
#!/usr/bin/python3
"""Scheduler for spiders."""
import time
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_project.spiders.deals import DealsSpider
def crawl_job():
"""Job to start spiders."""
settings = get_project_settings()
process = CrawlerProcess(settings)
process.crawl(DealsSpider)
process.start() # the script will block here until the end of the crawl
if __name__ == '__main__':
while True:
crawl_job()
time.sleep(30) # wait 30 seconds then crawl again
Run Code Online (Sandbox Code Playgroud)
目前,蜘蛛网第一次正常执行,然后经过一段时间的延迟,蜘蛛网再次启动,但是就在它开始抓取之前,我收到以下错误消息:
Traceback (most recent call last):
File "scheduler.py", line 27, in <module>
crawl_job()
File "scheduler.py", line 17, in crawl_job …Run Code Online (Sandbox Code Playgroud) python-3.x ×3
python ×2
concurrency ×1
cron ×1
dotenv ×1
gcloud ×1
generator ×1
kubernetes ×1
scrapy ×1
twisted ×1
type-hinting ×1
web-scraping ×1