我正在使用芹菜(并发池的并发性= 1),我希望能够在特定任务运行后关闭工作人员.需要注意的是,我希望避免工人在那之后接受任何进一步任务的任何可能性.
这是我在大纲中的尝试:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
raise WorkerShutdown()
Run Code Online (Sandbox Code Playgroud)
但是,当我运行工人时
celery -A celeryapp worker --concurrency=1 --pool=solo
Run Code Online (Sandbox Code Playgroud)
并运行任务
add.delay(1,4)
Run Code Online (Sandbox Code Playgroud)
我得到以下内容:
-------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: __main__:0x7f596896ce90
- ** …Run Code Online (Sandbox Code Playgroud) 新ReturnType的打字稿2.8是一个非常有用的功能,让你提取特定函数的返回类型.
function foo(e: number): number {
return e;
}
type fooReturn = ReturnType<typeof foo>; // number
Run Code Online (Sandbox Code Playgroud)
但是,我在泛型函数的上下文中使用它时遇到了麻烦.
function foo<T>(e: T): T {
return e;
}
type fooReturn = ReturnType<typeof foo>; // type fooReturn = {}
type fooReturn = ReturnType<typeof foo<number>>; // syntax error
type fooReturn = ReturnType<(typeof foo)<number>>; // syntax error
Run Code Online (Sandbox Code Playgroud)
有没有办法提取泛型函数给出特定类型参数的返回类型?
Django ORM是否提供了有条件地创建对象的方法?
例如,假设您希望使用某种乐观并发控制来插入新对象.
在某个时刻,您知道要在该表中插入的最新对象,并且您只想在此后没有插入新对象时才创建新对象.
如果是更新,您可以根据修订号进行过滤:
updated = Account.objects.filter(
id=self.id,
version=self.version,
).update(
balance=balance + amount,
version=self.version + 1,
)
Run Code Online (Sandbox Code Playgroud)
但是,我找不到任何记录的方式来为a create()或save()call 提供条件.
我正在寻找在SQL查询级别应用这些条件的东西,以避免"读 - 修改 - 写"问题.
我正在阅读的一篇文章将此作为不纯函数的示例(在JavaScript中):
const tipPercentage = 0.15;
const calculateTip = cost => cost * tipPercentage;
Run Code Online (Sandbox Code Playgroud)
这让我感到有些奇怪的例子,因为它tipPercentage是一个具有不可变值的常量.纯函数的常见示例允许在这些常量是函数时依赖于不可变常量.
const mul = (x, y) => x * y
const calculateTip = (cost, tipPercentage) => mul(cost, tipPercentage);
Run Code Online (Sandbox Code Playgroud)
在上面的例子中,如果我错了,请纠正我,calculateTip通常将其归类为纯函数.
所以,我的问题是:在函数式编程中,如果一个函数依赖于具有不可变值的外部定义的常量,当该值不是函数时,它仍然被认为是纯函数吗?
假设您有一个采用联合类型的函数,然后将该类型缩小并委托给其他两个纯函数之一。
function foo(arg: string|number) {
if (typeof arg === 'string') {
return fnForString(arg)
} else {
return fnForNumber(arg)
}
}
Run Code Online (Sandbox Code Playgroud)
假定fnForString()和fnForNumber()也是纯函数,并且它们本身已经过测试。
应该如何进行测试foo()?
fnForString(),并fnForNumber()作为一个实现细节,而且基本上编写测试时,重复测试,他们每个人的foo()?这种重复是否可以接受?foo()授人以fnForString()和fnForNumber()如通过嘲笑出来,并检查其委托给他们呢?当有人询问如何从另一个 celery 任务触发 celery 任务时,通常的答案是使用chain,例如:
workflow = celery.chain(first_task.s(a, b), triggered_task.s())
Run Code Online (Sandbox Code Playgroud)
然而,当触发的任务不改变第一个任务的结果时,这似乎很奇怪。举个例子,您想要进行一些处理,然后在处理完成后发送一些电子邮件通知。您可以使用以下方法执行此操作chain:
@app.task
def processing_task(input):
return process(input)
@app.task
def send_notifications_task(previous_result):
message = create_message(previous_result)
send_to_chat_channel(message)
return previous_result
processing_workflow = chain(
processing_task.s(),
send_notifications_task.s()
)
Run Code Online (Sandbox Code Playgroud)
你这样称呼它:
async_result = workflow.delay(input)
Run Code Online (Sandbox Code Playgroud)
然而,这有一些奇怪的特性:
send_notifications_task作为其返回值直接传递。输入不会以任何方式进行处理或转换。所以,我的第一个问题是:
在这种情况下,是否有任何理由不从第一个任务的主体内触发第二个任务,例如:
@app.task
def processing_task(input):
output = process(input)
send_notifications_task.delay(output)
return output
Run Code Online (Sandbox Code Playgroud)
我的第二个问题是:有没有更好的方法来处理我没有想到的这种情况?
我已经为 javascript/typescript 编写了一个互斥锁实现,但我正在努力解决如何测试它的问题。这是实现:
class Mutex {
private current = Promise.resolve();
async acquire() {
let release: () => void;
const next = new Promise<void>(resolve => {
release = () => { resolve(); };
});
const waiter = this.current.then(() => release);
this.current = next;
return await waiter;
}
}
Run Code Online (Sandbox Code Playgroud)
用法:
const mut = new Mutex();
async function usesMutex() {
const unlock = await mut.acquire();
try {
await doSomeStuff();
await doOtherStuff();
} finally {
unlock();
}
}
Run Code Online (Sandbox Code Playgroud)
如果互斥锁没有按预期工作,我不确定是否有任何简单的方法可以创建会导致测试失败的时序问题。任何建议将不胜感激。
我最近发现了MappingProxyType,它提供了一个
映射的只读代理。它提供了映射条目的动态视图,这意味着当映射更改时,视图会反映这些更改。
例如,如果您有一个带有内部字典的类实例,您希望能够将其提供给实例的用户进行检查,但不允许他们修改它,那么这似乎非常有用。提供一个 mappingproxy 提供了一个更有效的替代方法来提供一个 dict 的副本。用户可以通过 mappingproxy 检查字典,但如果他们试图改变它,则会引发异常。
我突然想到如果列表有类似的东西会很有用。我知道生成器可用于此目的,但您无法索引生成器以获取第 n 个元素。
Python 中是否存在等效于列表、元组等的 MappingProxyType?
创建 EC2 Autoscaling 组时,可以指定不同可用区中的多个子网,以便 Autoscaling 组可以在有容量的情况下启动实例。
使用 EC2 启动模板从头开始启动实例时,是否有办法执行类似的操作?我希望能够指定子网列表,然后要求 EC2 根据我的启动模板在具有相关实例类型备用容量的子网中启动实例。
但是,由于启动模板似乎要求您指定网络接口,因此我看不到实现此目的的明显方法。
有没有很好的方法或受支持的库在python3中合并异步迭代器?
所需的行为与在reactx中合并可观察对象的行为基本相同。
也就是说,在正常情况下,如果要合并两个异步迭代器,则希望生成的异步迭代器按时间顺序产生结果。迭代器之一中的错误应使合并的迭代器出轨。
(来源:http : //reactivex.io/documentation/operators/merge.html)
这是我的最佳尝试,但似乎可能存在以下标准解决方案:
async def drain(stream, q, sentinal=None):
try:
async for item in stream:
await q.put(item)
if sentinal:
await q.put(sentinal)
except BaseException as e:
await q.put(e)
async def merge(*streams):
q = asyncio.Queue()
sentinal = namedtuple("QueueClosed", ["truthy"])(True)
futures = {
asyncio.ensure_future(drain(stream, q, sentinal)) for stream in streams
}
remaining = len(streams)
while remaining > 0:
result = await q.get()
if result is sentinal:
remaining -= 1
continue
if isinstance(result, BaseException):
raise result
yield result
if …Run Code Online (Sandbox Code Playgroud) python ×4
javascript ×3
typescript ×3
celery ×2
testing ×2
amazon-ec2 ×1
autoscaling ×1
django ×1
django-orm ×1
generics ×1
mutex ×1
python-3.x ×1