标签: concurrent-processing

FastAPI 以串行方式而不是并行方式运行 api 调用

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}
Run Code Online (Sandbox Code Playgroud)

如果我在本地主机上运行我的代码 - 例如http://localhost:8501/ping- 在同一浏览器窗口的不同选项卡中,我得到:

Hello
bye
Hello
bye
Run Code Online (Sandbox Code Playgroud)

代替:

Hello
Hello
bye
bye
Run Code Online (Sandbox Code Playgroud)

我已经阅读过有关使用的内容httpx,但仍然无法实现真正​​的并行化。有什么问题?

python asynchronous concurrent-processing python-asyncio fastapi

50
推荐指数
1
解决办法
4万
查看次数

如何均衡处理多个并发任务?

问题

我们的处理服务为 UI、API 和内部客户端提供服务,并监听来自Kafka 的命令。很少有 API 客户端可能会在短时间内创建大量生成任务(一个任务是 N 条消息)。使用 Kafka,我们无法控制命令分发,因为每个命令都来自一个处理实例(又名工作程序)消耗的分区。因此,在处理 API 请求时,UI 请求可能等待太长时间。

在理想的实现中,我们应该均匀地处理所有任务,无论其大小。处理服务的容量分布在所有活动任务中。并且即使集群负载很重,我们总是明白,已经到达的新任务几乎可以立即开始处理,至少在所有其他任务的处理结束之前。在此处输入图片说明


解决方案

相反,我们想要一个看起来更像下图的架构,其中每个客户和端点的组合都有单独的队列。这种架构为我们提供了更好的隔离,以及基于每个客户动态调整吞吐量的能力。 在此处输入图片说明 在生产者方面

  • 任务来自客户端
  • 立即为此任务创建一个队列
  • 将所有消息发送到此队列

在消费者方面

  • 在一个过程中,您不断更新队列列表
  • 在其他进程中,您遵循此列表并使用例如来自每个队列的 1 条消息
  • 规模消费者

这样的问题有什么通用的解决方案吗?使用 RabbitMQ 或任何其他工具。? 过去,我们在项目中使用 Kafka,所以如果有任何方法使用 - 这太棒了,但我们可以使用任何技术来解决问题。

parallel-processing asynchronous rabbitmq concurrent-processing apache-kafka

11
推荐指数
1
解决办法
193
查看次数

如何使用async和await进行大量并发Web请求?

我在如何:通过使用异步和等待(C#)并行发出多个Web请求中阅读了Microsoft的方法, 并发现:

private async Task CreateMultipleTasksAsync()  
{  
    // Declare an HttpClient object, and increase the buffer size. The  
    // default buffer size is 65,536.  
    HttpClient client =  
        new HttpClient() { MaxResponseContentBufferSize = 1000000 };  

    // Create and start the tasks. As each task finishes, DisplayResults   
    // displays its length.  
    Task<int> download1 =   
        ProcessURLAsync("http://msdn.microsoft.com", client);  
    Task<int> download2 =   
        ProcessURLAsync("http://msdn.microsoft.com/library/hh156528(VS.110).aspx", client);  
    Task<int> download3 =   
        ProcessURLAsync("http://msdn.microsoft.com/library/67w7t67f.aspx", client);  

    // Await each task.  
    int length1 = await download1;  
    int length2 = await download2; …
Run Code Online (Sandbox Code Playgroud)

.net c# asynchronous concurrent-processing

5
推荐指数
2
解决办法
9051
查看次数

如果另一个未来失败,如何取消未来行动?

我有 2 个期货(db 表上的 2 个操作),我希望在保存修改之前检查两个期货是否已成功完成。

现在,我在第一个(作为依赖)中开始第二个未来,但我知道这不是最好的选择。我知道我可以使用for-comprehension 并行执行两个期货,但即使一个失败,另一个也会被执行(尚未测试)

firstFuture.dropColumn(tableName) match {
  case Success(_) => secondFuture.deleteEntity(entity)
  case Failure(e) => throw new Exception(e.getMessage)
}

// the  first future alters a table, drops a column
// the second future deletes a row from another table
Run Code Online (Sandbox Code Playgroud)

在这种情况下,如果第一个 future 成功执行,第二个可能会失败。我想恢复第一个未来的更新。我听说过 SQL 事务,似乎是这样的,但是如何呢?

val futuresResult = for {
  first <- firstFuture.dropColumn(tableName)
  second <- secondFuture.deleteEntity(entity)
} yield (first, second)
Run Code Online (Sandbox Code Playgroud)

一个for-comprehension是在我的情况好多了,因为我没有这两个期货之间的依赖关系,可以并行执行,但这不解决我的问题,结果可能是(成功,成功)或(失败,成功)的例子。

scala future concurrent-processing for-comprehension playframework

3
推荐指数
1
解决办法
171
查看次数

Scala actor作为单线程队列

我想在一个程序中使用actor,我会对某些演员有一些限制,好像他们是队列一样.例如,假设我有一些应用了更改事件的外部系统,还有一些外部系统数据的缓存.所以我有两个演员:

  1. ChangeApplicationActor
  2. CacheActor

作为一部分ChangeApplicationActor,当我将更改应用于X外部系统中的某个实体时,我想发送一些事件来告诉CacheActor同步:

val changeApplicationActor = actor { 
  loop {
    react {
      case ChangeInstruction(x) => 
        externalSystem.applyChange(x)
        cacheActor ! Sync(x)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

但我现在有两个要求:

  1. CacheActor具有内部状态和理想,我想它来处理其Sync指令顺序
  2. 如果我最终收到CacheActor包含两个Sync(x)相同值的指令的收件箱x,那么我想忽略第二个Sync指令(即我应该只有一个挂起的指令用于任何给定的值x)

有没有办法强迫演员单线程?有什么方法可以访问演员的邮箱并删除任何重复的事件?难道我不能避免实施CacheActoras,嗯,不是演员吗?

scala message-queue concurrent-processing actor

2
推荐指数
1
解决办法
2134
查看次数

防止发送php页面功能的垃圾邮件

背景:好的,我在ninjawars.net上运行了一个传统的BBG.玩家可以对通过表格帖子初始化的其他玩家进行"攻击".本质上,我们可以简化情况假装有一个页面,让我们称之为attack.php,用一个巨大的"ATTACK"表单发送到另一个php页面,让我们称之为accept_attack.php,第二页执行攻击功能,让我们说杀死其他玩家1,2或3.服务器运行PHP5,Postgresql,Apache

问题:

  • 如果我按下了那个大"ATTACK"按钮,然后它会把我带到accept_attack.php,我可以再次刷新三次,每次重新提交,再连续三次攻击.
  • 如果我打开第一页的三个标签,并在每个页面上点击攻击,我最终会发生三次瞬间攻击,一次性杀死玩家1,2和3,我可以不断刷新重复.
  • 尽管我尝试将"最近的攻击"计时器保存到数据库中,但是玩家似乎能够解决它,可能只是通过以足够同步的方式刷新三个复制的选项卡,以便他们可以全部检索相同的计时器(例如上午10:00:00:0000),然后进行最终处理.

需要的解决方案:

那么如何防止某个脚本的同一处理一次性重复执行?

Php,社会工程和/或javascript/jQuery解决方案首选(可能大约是那个顺序).

编辑:基于答案,这是我做的(可能在压力测试之前)解决它:会话答案似乎最简单/最易于理解,因此我使用了该数据存储.我测试了它似乎工作,但可能有一些方法,我不知道.

$recent_attack = null;
$start_of_attack = microtime(true);
$attack_spacing = 0.2; // fraction of a second
if(SESSION::is_set('recent_attack')){
    $recent_attack = SESSION::get('recent_attack');
}

if($recent_attack && $recent_attack>($start_of_attack-$attack_spacing)){
    echo "<p>Even the best of ninjas cannot attack that quickly.</p>";
    echo "<a href='attack_player.php'>Return to combat</a>";
    SESSION::set('recent_attack', $start_of_attack);
    die();
} else {
    SESSION::set('recent_attack', $start_of_attack);
}
Run Code Online (Sandbox Code Playgroud)

如果有方法可以改进那些或可利用的方式(除了对我来说显而易见的事情,回应的东西不是一个很好的逻辑分离,我很想知道.沿着这些方向,社区维基编辑.

javascript php spam-prevention concurrent-processing

2
推荐指数
1
解决办法
369
查看次数

同时下载多个页面?

我想在Python中编写一个脚本,可以从数据库中获取URL,并同时下载网页以加快速度,而不是等待每个页面一个接一个地下载.

根据这个线程,Python不允许这样做,因为有一个名为Global Interpreter Lock的东西会阻止多次激活相同的脚本.

在花时间学习Twisted框架之前,我想确保没有更简单的方法来完成我需要做的事情.

谢谢你的任何提示.

python concurrent-processing

2
推荐指数
2
解决办法
3245
查看次数

来自C#中BackgroundWorker2_RunWorkerCompleted的无效跨线程操作

我收到一个没有意义的错误.

Cross-thread operation not valid: Control 'buttonOpenFile' accessed from a thread other than the thread it was created on.

在我的应用程序中,UI线程会触发backgroundWorker1,当几乎完成时会触发backgroundWorker2并等待它完成. backgroundWorker1backgroundWorker2完成之前等待完成. AutoResetEvent变量用于标记每个工作人员何时完成.在backgroundWorker2_RunWorkerComplete调用函数中,重置表单控件.在此ResetFormControls()函数中抛出异常.我认为修改RunWorkerCompleted函数中的表单控件是安全的.两个后台工作程序都是从UI线程实例化的.这是我正在做的一个非常概括的版本:

  AutoResetEvent evtProgrammingComplete_c = new AutoResetEvent(false);
  AutoResetEvent evtResetComplete_c = new AutoResetEvent(false);

  private void ResetFormControls()
  {
     toolStripProgressBar1.Enabled = false;
     toolStripProgressBar1.RightToLeftLayout = false;
     toolStripProgressBar1.Value = 0;

     buttonInit.Enabled = true;
     buttonOpenFile.Enabled = true; // Error occurs here.
     buttonProgram.Enabled = true;
     buttonAbort.Enabled = false;
     buttonReset.Enabled = true;
     checkBoxPeripheryModule.Enabled = …
Run Code Online (Sandbox Code Playgroud)

c# multithreading backgroundworker concurrent-processing

2
推荐指数
1
解决办法
935
查看次数