πŸ“Š Change the order Laravel picks up queued jobs (AKA Priority Queues)

Photo by Levi Jones on Unsplash

πŸ“Š Change the order Laravel picks up queued jobs (AKA Priority Queues)

Β·

4 min read

Have you ever wished some queued jobs would be processed way ahead others? For example, if you want your VIP client's transanctions to be completed before everyone else's, this article might help you.

Simple approach ✨

If you have only a few tiers of clients, you might want to consider assigning them to a different queue so that VIP clients won't be blocked by other customers:

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessJob implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(private Customer $customer)
    {
        $this->onQueue($this->determineQueue());
    }

    /**
     * Determine which queue to use based on client type.
     */
    private function determineQueue(): string
    {
        return $customer->is_vip ? 'vip' : 'default';
    }

    public function handle(): void
    {
        // Job logic
    }
}

βš™οΈ Then running separate workers for both queues:

php artisan queue:work --queue=default
php artisan queue:work --queue=vip

βœ… This is really easy and will work with any queue driver, like Redis, SQS, Database, etc.

❌ Unfortunately, this is not really scalable if you wish to have more control over the order of queued jobs or have many different tiers of users.

Queue prioritization using custom queue driver πŸ‘¨πŸ»β€πŸ’»

If you wish to control the order of transactions in more sophisticated way, for example based on a transaction amount or any other factor, consider creating a custom queue driver.

1. Add priority column to jobs table

To start, add a priority column in your jobs table. You can do this by creating a migration:

php artisan make:migration add_priority_to_jobs_table

Update the migration file:

public function up()
{
    Schema::table('jobs', function (Blueprint $table) {
        $table->unsignedInteger('priority')->default(0); // Default priority is 0
    });
}

public function down()
{
    Schema::table('jobs', function (Blueprint $table) {
        $table->dropColumn('priority');
    });
}

Run the migration:

php artisan migrate

2. Create a Custom Queue Connector

Create a custom queue connector that extends Laravel's database queue connector. In this custom connector, you'll override the push and pop methods to handle job priority:

<?php

namespace App\Queue;

use Illuminate\Queue\DatabaseQueue;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;

class PriorityDatabaseQueue extends DatabaseQueue
{
    /**
     * Push a new job onto the queue.
     *
     * @param  string  $queue
     * @param  string  $payload
     * @param  string|null  $queue
     * @return mixed
     */
    public function push($job, $data = '', $queue = null)
    {
        $queue = $this->getQueue($queue);
        $payload = $this->createPayload($job, $queue, $data);

        return $this->database->table($this->table)->insertGetId([
            'queue' => $queue,
            'payload' => $payload,
            'attempts' => 0,
            'reserved' => 0,
            'reserved_at' => null,
            'available_at' => $this->availableAt(),
            'created_at' => $this->currentTime(),
            'priority' => $payload['data']['priority'] ?? 0, // Set the priority
        ]);
    }

    /**
     * Pop the next job off of the queue with prioritization.
     *
     * @param  string|null  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

        $job = $this->database->table($this->table)
            ->where('queue', $queue)
            ->where('reserved', 0)
            ->orderBy('priority', 'desc') // Prioritize jobs based on their priority level
            ->orderBy('id', 'asc')
            ->lockForUpdate()
            ->first();

        if (!is_null($job)) {
            return new DatabaseJob(
                $this->container, $this, new DatabaseJobRecord((object) $job)
            );
        }
    }
}

3. Create a Custom Connector Class

Create a custom connector class to register the custom queue connector:

<?php

namespace App\Queue\Connectors;

use Illuminate\Queue\Connectors\DatabaseConnector;
use App\Queue\PriorityDatabaseQueue;

class PriorityDatabaseConnector extends DatabaseConnector
{
    public function connect(array $config)
    {
        return new PriorityDatabaseQueue(
            $this->connections->connection($config['connection'] ?? null),
            $config['table'],
            $config['queue'],
            $config['retry_after'] ?? 60
        );
    }
}

4. Register the Custom Connector

Register your custom connector in a service provider.

use Illuminate\Support\ServiceProvider;
use App\Queue\Connectors\PriorityDatabaseConnector;

class AppServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $this->app->resolving('queue', function ($queue) {
            $queue->addConnector('prioritydatabase', function () {
                return new PriorityDatabaseConnector($this->app['db']);
            });
        });
    }
}

From now on, using a prioritydatabase driver will use our custom logic to push and fetch jobs to the queue based on their priority.

5. Configure Queue Connection

Update your config/queue.php to use the new driver:

'connections' => [
    'prioritydatabase' => [
        'driver' => 'prioritydatabase',
        'table' => 'jobs',
        'queue' => 'priority',
        'retry_after' => 60,
    ],
],

6. Set Job Priority When Dispatching Jobs

When dispatching jobs, set the priority level. You can do this by adding a $priority field to the job's payload.

class SomeJob implements ShouldQueue
{
    public function __construct(public int $priority = 0)
    {
    }

    public function handle(): void
    {
        // Job logic
    }
}

// Dispatching the job with priority
$job = new SomeJob(10); // Priority 10
dispatch($job)->onQueue('priority');

With these steps, you will have a priority queue setup in Laravel where jobs are stored with a priority column in the jobs table. The custom queue connector ensures that jobs are popped from the queue based on their priority, allowing higher priority jobs to be processed first.

Feel free to customize the way you pop the jobs based on your needs.

βœ… More control over the job processing order. ❌ Changing the queue driver requires major refactor to adapt the popping logic. Not all drivers might be supported.

Summary

πŸ•΅πŸ»β€β™‚οΈ Digging deep in Laravel queue connectors could be fun and it's interesting to learn how it works internally so that we are able to customize it to our needs.

While the DB driver is easy to understand and customize, I strongly suggest not to use database driver in production. If you wish to maintain scalability and avoid lock issues, I recommend creating a similar connector based on Redis with ZADD and ZRANGE/ZREM to handle similar logic instead.

Β