Laravel Queues: Simplifying Queue Management with SingleStore

One of the reasons we decided to switch over to SingleStore Cloud was that we could not only move our MySql (AWS Aurora), but we could also get rid of our Redis cluster that we are using for queues. Utilizing SingleStore's rowstore table type, we can keep the jobs table in memory for faster reads.

We had a few issues using the out-of-the-box MySQL driver for the queues. Even though it's an in-memory table, we were still experiencing table locks. This is due to the nodes locking down different rows but only returning one. To combat this, we first tried loading the jobs in random order within a created time of 5 minutes. This worked okay. Then Jack Ellis tweeted this:

Below is what we are currently using in our production environment to handle our queue (most of it is the DB version, we pulled it all out in case we needed more modifications):



namespace App\Queue;

use App\Queue\Jobs\SingleStoreJob;
use App\Queue\Jobs\SingleStoreJobRecord;
use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Cache;

class SingleStoreQueue extends Queue implements QueueContract, ClearableQueue
     * The database connection instance.
     * @var \Illuminate\Database\Connection
    protected $database;

     * The database table that holds the jobs.
     * @var string
    protected $table;

     * The name of the default queue.
     * @var string
    protected $default;

     * The expiration time of a job.
     * @var int|null
    protected $retryAfter = 60;

     * Store the cache lock key.
     * @var mixed
    protected $lock;

     * Create a new database queue instance.
     * @param  \Illuminate\Database\ConnectionInterface|\Illuminate\Database\Connection  $database
     * @param  string  $table
     * @param  string  $default
     * @param  int  $retryAfter
     * @param  bool  $dispatchAfterCommit
     * @return void
    public function __construct(
        $default = 'default',
        $retryAfter = 60,
        $dispatchAfterCommit = false
    ) {
        $this->table = $table;
        $this->default = $default;
        $this->database = $database;
        $this->retryAfter = $retryAfter;
        $this->dispatchAfterCommit = $dispatchAfterCommit;

     * Get the size of the queue.
     * @param  string|null  $queue
     * @return int
    public function size($queue = null)
        return $this->database->table($this->table)
            ->where('queue', $this->getQueue($queue))

     * Push a new job onto the queue.
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
    public function push($job, $data = '', $queue = null)
        return $this->enqueueUsing(
            $this->createPayload($job, $this->getQueue($queue), $data),
            function ($payload, $queue) {
                return $this->pushToDatabase($queue, $payload);

     * Push a raw payload onto the queue.
     * @param  string  $payload
     * @param  string|null  $queue
     * @param  array  $options
     * @return mixed
    public function pushRaw($payload, $queue = null, array $options = [])
        return $this->pushToDatabase($queue, $payload);

     * Push a new job onto the queue after (n) seconds.
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return void
    public function later($delay, $job, $data = '', $queue = null)
        return $this->enqueueUsing(
            $this->createPayload($job, $this->getQueue($queue), $data),
            function ($payload, $queue, $delay) {
                return $this->pushToDatabase($queue, $payload, $delay);

     * Push an array of jobs onto the queue.
     * @param  array  $jobs
     * @param  mixed  $data
     * @param  string|null  $queue
     * @return mixed
    public function bulk($jobs, $data = '', $queue = null)
        $queue = $this->getQueue($queue);

        $now = $this->availableAt();

        return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
            function ($job) use ($queue, $data, $now) {
                return $this->buildDatabaseRecord(
                    $this->createPayload($job, $this->getQueue($queue), $data),
                    isset($job->delay) ? $this->availableAt($job->delay) : $now,

     * Release a reserved job back onto the queue after (n) seconds.
     * @param  string  $queue
     * @param  \App\Queue\Jobs\SingleStoreJobRecord  $job
     * @param  int  $delay
     * @return mixed
    public function release($queue, $job, $delay)
        return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);

     * Pop the next job off of the queue.
     * @param  string|null  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     * @throws \Throwable
    public function pop($queue = null)
        $queue = $this->getQueue($queue);

        return $this->database->transaction(function () use ($queue) {
            if ($job = $this->getNextAvailableJob($queue)) {
                return $this->marshalJob($queue, $job);

     * Delete a reserved job from the queue.
     * @param  string  $queue
     * @param  string  $id
     * @return void
     * @throws \Throwable
    public function deleteReserved($queue, $id)
        $this->database->transaction(function () use ($id) {
            $this->database->table($this->table)->where('id', $id)->delete();

     * Delete a reserved job from the reserved queue and release it.
     * @param  string  $queue
     * @param  \Illuminate\Queue\Jobs\DatabaseJob  $job
     * @param  int  $delay
     * @return void
    public function deleteAndRelease($queue, $job, $delay)
        $this->database->transaction(function () use ($queue, $job, $delay) {
            if ($this->database->table($this->table)->find($job->getJobId())) {
                $this->database->table($this->table)->where('id', $job->getJobId())->delete();

            $this->release($queue, $job->getJobRecord(), $delay);

     * Delete all of the jobs from the queue.
     * @param  string  $queue
     * @return int
    public function clear($queue)
        return $this->database->table($this->table)
            ->where('queue', $this->getQueue($queue))

     * Get the queue or return the default.
     * @param  string|null  $queue
     * @return string
    public function getQueue($queue)
        return $queue ? $queue : $this->default;

     * Get the underlying database instance.
     * @return \Illuminate\Database\Connection
    public function getDatabase()
        return $this->database;

     * Push a raw payload to the database with a given delay of (n) seconds.
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  int  $attempts
     * @return mixed
    protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
        return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(

     * Create an array to insert for the given job.
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  int  $availableAt
     * @param  int  $attempts
     * @return array
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
        return [
            'queue' => $queue,
            'attempts' => $attempts,
            'reserved_at' => null,
            'available_at' => $availableAt,
            'created_at' => $this->currentTime(),
            'payload' => $payload,

     * Get the next available job for the queue.
     * @param  string|null  $queue
     * @return  \App\Queue\Jobs\SingleStoreJobRecord|null
    protected function getNextAvailableJob($queue)
        $job = $this->database->table($this->table)
            ->where('queue', $this->getQueue($queue))
            ->where(function ($query) {
        if (is_null($job)) {
            return null;
        try {
            $this->lock = Cache::lock('queue:worker-'.$job->id, 10 * 60);
        } catch (\Throwable $th) {
            return null;
        if ($this->lock->get()) {
            return new SingleStoreJobRecord((object) $job);

        return null;

     * Modify the query to check for available jobs.
     * @param  \Illuminate\Database\Query\Builder  $query
     * @return void
    protected function isAvailable($query)
        $query->where(function ($query) {
            $query->where('reserved', 0)
                ->where('available_at', '<=', $this->currentTime());

     * Modify the query to check for jobs that are reserved but have expired.
     * @param  \Illuminate\Database\Query\Builder  $query
     * @return void
    protected function isReservedButExpired($query)
        $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();

        $query->orWhere(function ($query) use ($expiration) {
            $query->where('reserved_at', '<=', $expiration);

     * Marshal the reserved job into a DatabaseJob instance.
     * @param  string  $queue
     * @param  \App\Queue\Jobs\SingleStoreJobRecord  $job
     * @return \Illuminate\Queue\Jobs\DatabaseJob
    protected function marshalJob($queue, $job)
        $job = $this->markJobAsReserved($job);

        return new SingleStoreJob(

     * Mark the given job ID as reserved.
     * @param  \App\Queue\Jobs\SingleStoreJobRecord  $job
     * @return \App\Queue\Jobs\SingleStoreJobRecord
    protected function markJobAsReserved($job)
        $this->database->table($this->table)->where('id', $job->id)->update([
            'reserved' => 1,
            'reserved_at' => now()->getTimestamp(),
            'attempts' => $job->increment(),
        return $job;



namespace App\Queue\Jobs;

use Illuminate\Support\InteractsWithTime;

class SingleStoreJobRecord
    use InteractsWithTime;

     * The underlying job record.
     * @var \stdClass
    protected $record;

     * Create a new job record instance.
     * @param  \stdClass  $record
     * @return void
    public function __construct($record)
        $this->record = $record;

     * Dynamically access the underlying job information.
     * @param  string  $key
     * @return mixed
    public function __get($key)
        return $this->record->{$key};

     * Increment the number of times the job has been attempted.
     * @return int
    public function increment()

        return $this->record->attempts;

     * Update the "reserved at" timestamp of the job.
     * @return int
    public function touch()
        $this->record->reserved_at = $this->currentTime();

        return $this->record->reserved_at;



namespace App\Queue\Jobs;

use App\Queue\SingleStoreQueue;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;

class SingleStoreJob extends Job implements JobContract
     * The database queue instance.
     * @var \App\Queue\SingleStoreQueue
    protected $database;

     * The database job payload.
     * @var \stdClass
    protected $job;

     * Create a new job instance.
     * @param  \Illuminate\Container\Container  $container
     * @param  SingleStoreQueue $database
     * @param  \stdClass  $job
     * @param  string  $connectionName
     * @param  string  $queue
     * @return void
    public function __construct(Container $container, SingleStoreQueue $database, $job, $connectionName, $queue)
        $this->job = $job;
        $this->queue = $queue;
        $this->database = $database;
        $this->container = $container;
        $this->connectionName = $connectionName;

     * Release the job back into the queue after (n) seconds.
     * @param  int  $delay
     * @return void
    public function release($delay = 0)

        $this->database->deleteAndRelease($this->queue, $this, $delay);

     * Delete the job from the queue.
     * @return void
    public function delete()

        $this->database->deleteReserved($this->queue, $this->job->id);

     * Get the number of times the job has been attempted.
     * @return int
    public function attempts()
        return (int) $this->job->attempts;

     * Get the job identifier.
     * @return string
    public function getJobId()
        return $this->job->id;

     * Get the raw body string for the job.
     * @return string
    public function getRawBody()
        return $this->job->payload;

     * Get the database job record.
     * @return \Illuminate\Queue\Jobs\DatabaseJobRecord
    public function getJobRecord()
        return $this->job;
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
  `payload` longtext CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL DEFAULT 0,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`),
  SHARD KEY `id` (`id`)
CREATE TABLE `cache_locks` (
  `key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
  `owner` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
  `expiration` int(11) NOT NULL,
  SHARD KEY `__SHARDKEY` (`key`),
  KEY `expiration` (`expiration`) USING CLUSTERED COLUMNSTORE

Did you find this article valuable?

Support Guy Warner by becoming a sponsor. Any amount is appreciated!