Laravel Queue:restart 是如何实现队列的平滑启动的

Posted by Cann on April 11, 2018
当我们使用superviosrctl reload强制重启队列进程时,如果队列中有正在执行的任务,就会造成任务丢失,而laravel的队列提供了queue:restart命令用于解决这个问题。接下来我们来分析一下它是如何实现这一功能的。

queue:restart命令定义于Illuminate\Queue\Console\RestartCommand中:

    /**
     * Execute the console command.
     *
     * @return void
     */
    public function handle()
    {
        $this->laravel['cache']->forever('illuminate:queue:restart', $this->currentTime());

        $this->info('Broadcasting queue restart signal.');
    }

当执行restart命令后,它仅做了一件事,就是将当前时间戳存储在redis中。

再来看Illuminate\Queue\Worker文件,该文件中的daemon方法便是Laravel队列的核心代码。

    public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        // do something

        $lastRestart = $this->getTimestampOfLastQueueRestart();

        while (true) {

            // do something(在这里它会去find job => exec job balabalabalabala)

            $this->stopIfNecessary($options, $lastRestart);
        }
    }

它首先会调用getTimestampOfLastQueueRestart()方法,获取上次重启时间:

    protected function getTimestampOfLastQueueRestart()
    {
        if ($this->cache) {
            return $this->cache->get('illuminate:queue:restart');
        }
    }

然后进入while()死循环,不断去redis里pop任务、执行任务。

在while语句的底部,也就是当前任务执行完后,调用stopIfNecessary()方法:

    protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
    {
        if ($this->shouldQuit) {
            $this->kill();
        }

        if ($this->memoryExceeded($options->memory)) {
            $this->stop(12);
        } elseif ($this->queueShouldRestart($lastRestart)) {
            $this->stop();
        }
    }

queueShouldRestart()中,它会再去redis取出重启时间,然后和守护进程启动之初获取到的重启时间比对,如果两者不相等,表示守护经常运行后,开发者执行过queue:restart命令,队列需要重启。

    protected function queueShouldRestart($lastRestart)
    {
        return $this->getTimestampOfLastQueueRestart() != $lastRestart;
    }

判断队列需要重启后,它会运行stop()方法,在这里,它直接使用exit结束while循环,退出守护进程。

    public function stop($status = 0)
    {
        $this->events->dispatch(new Events\WorkerStopping);

        exit($status);
    }

最后,再依赖supervisor等进程管理工具重新启动进程。

简而言之,当我们执行queue:restart时,会生成一个标识,Laravel Queue每执行完一个任务,都会去验证当前标识和进程启动之前的标识是否一致,若不一致,强制结束进程,再依赖supervisor重启进程,以此保证队列重启时,没有正在运行中的任务