Laravel Bus Batch 待处理作业为负数

net*_*nnn 3 php queue amazon-web-services shopify laravel

我正在尝试通过缓存加载 Shopify 产品,并使用 IMRedis::throttle来防止 Shopify 中的速率限制。我的问题是我把所有的获取过程都放在 a 中Batch,它返回一个负数。待处理的作业

这是批次信息。 在此输入图像描述

这是我的方法

公共函数加载(请求$请求,样本$样本){

    return DB::transaction(function () use ($sample) {
        $batch = Bus::batch([])->dispatch();

            $since_id = 0;

            while ($since_id >= 0) {
                $fetchedProducts = (new ShopifyProduct(
                    $sample->shopify_domain,
                    $sample->shopify_info['access_token'])
                )
                ->getById($since_id)
                ->then(function ($data) {
                    return $data['products'];
                }, function () {
                    return [];
                })
                ->wait();

                if (collect($fetchedProducts)->count() == 0) break;

                $lastProduct = Arr::last($fetchedProducts);
                $since_id = $lastProduct['id'];

                collect($fetchedProducts)
                    ->each(function ($shopifyProduct) use($merchant, &$batch) {
                        $batch->add(new CacheProducts($shopifyProduct, $sample));
                    });
            }
        

        return $this->okResponse(['batch_id' => $batch->id])
            ->header('Content-Type', 'application/vnd.api+json');
    });
}
Run Code Online (Sandbox Code Playgroud)

这就是工作

/**
 * Execute the job.
 *
 * @return void
 */
public function handle()
{
    Redis::throttle("shopify-cache")->allow(30)->every(60)->block(70)->then(function () {
        (new Metafield(
                $this->merchant->shopify_domain,
                $this->merchant->shopify_info['access_token']
            )
        )
        ->get($this->shopifyProduct['id'], 'products')
        ->then(function ($data) {
            $this->shopifyProduct['metafields'] = $data['metafields'];

            $cacheProducts = Cache::tags($this->merchant->name)->get('products') ?? [];

            array_push($cacheProducts, $this->shopifyProduct);

            Cache::tags($this->merchant->name)->put('products', $cacheProducts, now()->addHour());

        }, function ($e) {
        })
        ->wait();
    }, function () {
        return $this->release(10);
    });
}
Run Code Online (Sandbox Code Playgroud)

Den*_*nis 5

您需要重构分配作业的代码,以便更好地理解您到底在做什么并简化您的代码。

除此之外,您正在分派一个空批次,稍后在循环中您将作业添加到该批次中,这$batch->add()完全没问题,但是您在闭包中传递了变量&$batch,我认为这没有按您的预期工作。

什么正在$fetchedProducts回归?你是通过ID来获取它的,但是lastId是该数组中最后一项的id,那么这不是同一个id吗?

我认为如果您和平地拆分代码并调试每个部分以检查作业批次是否正常工作,这将对您有所帮助:

  1. 首先确保您的作业句柄不执行任何操作,而是直接返回 ( return;),以便您可以首先测试您的load类。

    public function handle()
    {
      return;
    
      // All of the original code in this method you can comment out, for quicker testing.
    }
    
    Run Code Online (Sandbox Code Playgroud)
  2. 只需将您的load()方法分成几部分,这样您就可以逐步检查每行代码是否按您的预期工作。

  3. 重构您的代码以更好地理解正在发生的情况。我做了一些更改,希望这段代码更具可读性;

     return DB::transaction(function () use ($sample) {
         $jobs = [];
    
         // Fetch all products (if there are too many products, you may use paginated results)
         $fetchedProducts = (new ShopifyProduct(
             $sample->shopify_domain,
             $sample->shopify_info['access_token'])
         )
         // Can you explain how $since_id can be different for each product if you are fetching products by that same id?
         //->getById($since_id)
         ->then(function ($data) {
             return $data['products'];
         }, function () {
             return [];
         })->wait();
    
         if (collect($fetchedProducts)->empty()) {
             return $this->okResponse(['message' => 'No products to process.'])
                 ->header('Content-Type', 'application/vnd.api+json');
         }
    
         collect($fetchedProducts)
             ->each(function ($shopifyProduct) use($merchant, &$jobs) {
                 $jobs[] = new CacheProducts($shopifyProduct, $sample)
             });
    
         // After all jobs are added, we can dispatch them.
         $batch = Bus::batch($jobs)->dispatch();
    
         return $this->okResponse(['batch_id' => $batch->id])
             ->header('Content-Type', 'application/vnd.api+json');
     });
    
    Run Code Online (Sandbox Code Playgroud)