diff --git a/core/Command/Background/Job.php b/core/Command/Background/Job.php index 823498cf8ca6c..87c06be48da4d 100644 --- a/core/Command/Background/Job.php +++ b/core/Command/Background/Job.php @@ -25,24 +25,17 @@ namespace OC\Core\Command\Background; -use OCP\BackgroundJob\IJob; use OCP\BackgroundJob\IJobList; -use OCP\ILogger; -use Symfony\Component\Console\Command\Command; +use Psr\Log\LoggerInterface; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -class Job extends Command { - protected IJobList $jobList; - protected ILogger $logger; - +class Job extends JobBase { public function __construct(IJobList $jobList, - ILogger $logger) { - parent::__construct(); - $this->jobList = $jobList; - $this->logger = $logger; + LoggerInterface $logger) { + parent::__construct($jobList, $logger); } protected function configure(): void { @@ -89,7 +82,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int $output->writeln('Something went wrong when trying to retrieve Job with ID ' . $jobId . ' from database'); return 1; } - $job->execute($this->jobList, $this->logger); + $job->execute($this->jobList, \OC::$server->getLogger()); $job = $this->jobList->getById($jobId); if (($job === null) || ($lastRun !== $job->getLastRun())) { @@ -106,53 +99,4 @@ protected function execute(InputInterface $input, OutputInterface $output): int return 0; } - - protected function printJobInfo(int $jobId, IJob $job, OutputInterface$output): void { - $row = $this->jobList->getDetailsById($jobId); - - $lastRun = new \DateTime(); - $lastRun->setTimestamp((int) $row['last_run']); - $lastChecked = new \DateTime(); - $lastChecked->setTimestamp((int) $row['last_checked']); - $reservedAt = new \DateTime(); - $reservedAt->setTimestamp((int) $row['reserved_at']); - - $output->writeln('Job class: ' . get_class($job)); - $output->writeln('Arguments: ' . json_encode($job->getArgument())); - - $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; - if ($isTimedJob) { - $output->writeln('Type: timed'); - } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { - $output->writeln('Type: queued'); - } else { - $output->writeln('Type: job'); - } - - $output->writeln(''); - $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); - if ((int) $row['reserved_at'] === 0) { - $output->writeln('Reserved at: -'); - } else { - $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); - } - $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); - $output->writeln('Last duration: ' . $row['execution_duration']); - - if ($isTimedJob) { - $reflection = new \ReflectionClass($job); - $intervalProperty = $reflection->getProperty('interval'); - $intervalProperty->setAccessible(true); - $interval = $intervalProperty->getValue($job); - - $nextRun = new \DateTime(); - $nextRun->setTimestamp($row['last_run'] + $interval); - - if ($nextRun > new \DateTime()) { - $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); - } else { - $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); - } - } - } } diff --git a/core/Command/Background/JobBase.php b/core/Command/Background/JobBase.php new file mode 100644 index 0000000000000..fe2880c0988a0 --- /dev/null +++ b/core/Command/Background/JobBase.php @@ -0,0 +1,93 @@ + + * + * @author Julius Härtl + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + + +namespace OC\Core\Command\Background; + +use OCP\BackgroundJob\IJob; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Output\OutputInterface; + +abstract class JobBase extends \OC\Core\Command\Base { + protected IJobList $jobList; + protected LoggerInterface $logger; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct(); + $this->jobList = $jobList; + $this->logger = $logger; + } + + protected function printJobInfo(int $jobId, IJob $job, OutputInterface $output): void { + $row = $this->jobList->getDetailsById($jobId); + + $lastRun = new \DateTime(); + $lastRun->setTimestamp((int) $row['last_run']); + $lastChecked = new \DateTime(); + $lastChecked->setTimestamp((int) $row['last_checked']); + $reservedAt = new \DateTime(); + $reservedAt->setTimestamp((int) $row['reserved_at']); + + $output->writeln('Job class: ' . get_class($job)); + $output->writeln('Arguments: ' . json_encode($job->getArgument())); + + $isTimedJob = $job instanceof \OC\BackgroundJob\TimedJob || $job instanceof \OCP\BackgroundJob\TimedJob; + if ($isTimedJob) { + $output->writeln('Type: timed'); + } elseif ($job instanceof \OC\BackgroundJob\QueuedJob || $job instanceof \OCP\BackgroundJob\QueuedJob) { + $output->writeln('Type: queued'); + } else { + $output->writeln('Type: job'); + } + + $output->writeln(''); + $output->writeln('Last checked: ' . $lastChecked->format(\DateTimeInterface::ATOM)); + if ((int) $row['reserved_at'] === 0) { + $output->writeln('Reserved at: -'); + } else { + $output->writeln('Reserved at: ' . $reservedAt->format(\DateTimeInterface::ATOM) . ''); + } + $output->writeln('Last executed: ' . $lastRun->format(\DateTimeInterface::ATOM)); + $output->writeln('Last duration: ' . $row['execution_duration']); + + if ($isTimedJob) { + $reflection = new \ReflectionClass($job); + $intervalProperty = $reflection->getProperty('interval'); + $intervalProperty->setAccessible(true); + $interval = $intervalProperty->getValue($job); + + $nextRun = new \DateTime(); + $nextRun->setTimestamp($row['last_run'] + $interval); + + if ($nextRun > new \DateTime()) { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } else { + $output->writeln('Next execution: ' . $nextRun->format(\DateTimeInterface::ATOM) . ''); + } + } + } +} diff --git a/core/Command/Background/JobWorker.php b/core/Command/Background/JobWorker.php new file mode 100644 index 0000000000000..2ca4af73474b1 --- /dev/null +++ b/core/Command/Background/JobWorker.php @@ -0,0 +1,173 @@ + + * + * @author Joas Schilling + * + * @license GNU AGPL version 3 or any later version + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +namespace OC\Core\Command\Background; + +use OC\Core\Command\InterruptedException; +use OCP\BackgroundJob\IJobList; +use Psr\Log\LoggerInterface; +use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; +use Symfony\Component\Console\Output\OutputInterface; + +class JobWorker extends JobBase { + private array $executedJobs = []; + + public function __construct(IJobList $jobList, + LoggerInterface $logger) { + parent::__construct($jobList, $logger); + } + + protected function configure(): void { + parent::configure(); + + $this + ->setName('background-job:worker') + ->setDescription('Run a background job worker') + ->addArgument( + 'job-class', + InputArgument::OPTIONAL, + 'The class of the job in the database' + ) + ->addOption( + 'once', + null, + InputOption::VALUE_NONE, + 'Only execute the worker once (as a regular cron execution would do it)' + ) + ->addOption( + 'interval', + 'i', + InputOption::VALUE_OPTIONAL, + 'Interval in seconds in which the worker should repeat already processed jobs (set to 0 for no repeat)', + 5 + ) + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): int { + $jobClass = $input->getArgument('job-class'); + + if ($jobClass && !class_exists($jobClass)) { + $output->writeln('Invalid job class'); + return 1; + } + + while (true) { + // Handle canceling of the process + try { + $this->abortIfInterrupted(); + } catch (InterruptedException $e) { + $output->writeln('Cleaning up before quitting. Press Ctrl-C again to kill, but this may have unexpected side effects.'); + $this->unlockExecuted(); + $output->writeln('Background job worker stopped'); + break; + } + + $this->printSummary($input, $output); + + $interval = (int)($input->getOption('interval') ?? 5); + + // Unlock jobs that should be executed again after the interval + // Alternative could be to set last_checked to interval in the future to avoid the extra locks + foreach ($this->executedJobs as $id => $time) { + if ($time <= time() - $interval) { + unset($this->executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } + + usleep(50000); + $job = $this->jobList->getNext(false, $jobClass); + if (!$job) { + if ($input->getOption('once') === true || $interval === 0) { + break; + } + + $output->writeln("Waiting for new jobs to be queued", OutputInterface::VERBOSITY_VERBOSE); + // Re-check interval for new jobs + sleep(1); + continue; + } + + + if (isset($this->executedJobs[$job->getId()]) && ($this->executedJobs[$job->getId()] + $interval > time())) { + $output->writeln("Job already executed within timeframe " . get_class($job) . " " . $job->getId() . '', OutputInterface::VERBOSITY_VERBOSE); + continue; + } + + $output->writeln("Running job " . get_class($job) . " with ID " . $job->getId()); + + if ($output->isVerbose()) { + $this->printJobInfo($job->getId(), $job, $output); + } + + $job->execute($this->jobList, \OC::$server->getLogger()); + + // clean up after unclean jobs + \OC_Util::tearDownFS(); + \OC::$server->getTempManager()->clean(); + + $this->jobList->setLastJob($job); + $this->jobList->unlockJob($job); + $this->executedJobs[$job->getId()] = time(); + + if ($input->getOption('once') === true) { + break; + } + } + + $this->unlockExecuted(); + + return 0; + } + + private function printSummary(InputInterface $input, OutputInterface $output): void { + if (!$output->isVeryVerbose()) { + return; + } + $output->writeln("Summary"); + + $counts = []; + foreach ($this->jobList->countByClass() as $row) { + $counts[] = $row; + } + $this->writeTableInOutputFormat($input, $output, $counts); + } + + private function unlockExecuted() { + foreach ($this->executedJobs as $id => $time) { + unset($this->executedJobs[$id]); + $job = $this->jobList->getById($id); + if ($job !== null) { + $this->jobList->unlockJob($job); + } + } + } +} diff --git a/core/register_command.php b/core/register_command.php index 64769c888fd51..1b25e6d3ed102 100644 --- a/core/register_command.php +++ b/core/register_command.php @@ -89,7 +89,8 @@ $application->add(new OC\Core\Command\Background\Cron(\OC::$server->getConfig())); $application->add(new OC\Core\Command\Background\WebCron(\OC::$server->getConfig())); $application->add(new OC\Core\Command\Background\Ajax(\OC::$server->getConfig())); - $application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->getLogger())); + $application->add(new OC\Core\Command\Background\Job(\OC::$server->getJobList(), \OC::$server->get(LoggerInterface::class))); + $application->add(\OCP\Server::get(OC\Core\Command\Background\JobWorker::class)); $application->add(new OC\Core\Command\Background\ListCommand(\OC::$server->getJobList())); $application->add(\OC::$server->query(\OC\Core\Command\Broadcast\Test::class)); diff --git a/cron.php b/cron.php index 7d661621ed090..4268b2ef36657 100644 --- a/cron.php +++ b/cron.php @@ -142,7 +142,8 @@ $endTime = time() + 14 * 60; $executedJobs = []; - while ($job = $jobList->getNext($onlyTimeSensitive)) { + $jobClass = isset($argv[1]) ? $argv[1] : null; + while ($job = $jobList->getNext($onlyTimeSensitive, $jobClass)) { if (isset($executedJobs[$job->getId()])) { $jobList->unlockJob($job); break; diff --git a/lib/private/BackgroundJob/JobList.php b/lib/private/BackgroundJob/JobList.php index 20176e451251d..3b1e2e91c719c 100644 --- a/lib/private/BackgroundJob/JobList.php +++ b/lib/private/BackgroundJob/JobList.php @@ -205,7 +205,7 @@ public function getJobs($job, ?int $limit, int $offset): array { /** * get the next job in the list */ - public function getNext(bool $onlyTimeSensitive = false): ?IJob { + public function getNext(bool $onlyTimeSensitive = false, string $jobClass = null): ?IJob { $query = $this->connection->getQueryBuilder(); $query->select('*') ->from('jobs') @@ -218,6 +218,10 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { $query->andWhere($query->expr()->eq('time_sensitive', $query->createNamedParameter(IJob::TIME_SENSITIVE, IQueryBuilder::PARAM_INT))); } + if ($jobClass) { + $query->andWhere($query->expr()->eq('class', $query->createNamedParameter($jobClass))); + } + $update = $this->connection->getQueryBuilder(); $update->update('jobs') ->set('reserved_at', $update->createNamedParameter($this->timeFactory->getTime())) @@ -238,7 +242,7 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { if ($count === 0) { // Background job already executed elsewhere, try again. - return $this->getNext($onlyTimeSensitive); + return $this->getNext($onlyTimeSensitive, $jobClass); } $job = $this->buildJob($row); @@ -252,7 +256,7 @@ public function getNext(bool $onlyTimeSensitive = false): ?IJob { $reset->executeStatement(); // Background job from disabled app, try again. - return $this->getNext($onlyTimeSensitive); + return $this->getNext($onlyTimeSensitive, $jobClass); } return $job; @@ -382,4 +386,23 @@ public function resetBackgroundJob(IJob $job): void { ->where($query->expr()->eq('id', $query->createNamedParameter($job->getId()), IQueryBuilder::PARAM_INT)); $query->executeStatement(); } + + public function countByClass(): array { + $query = $this->connection->getQueryBuilder(); + $query->select('class') + ->selectAlias($query->func()->count('id'), 'count') + ->from('jobs') + ->orderBy('count') + ->groupBy('class'); + + $result = $query->executeQuery(); + + $jobs = []; + while ($row = $result->fetch()) { + $jobs[] = $row; + } + + return $jobs; + + } } diff --git a/tests/lib/BackgroundJob/DummyJobList.php b/tests/lib/BackgroundJob/DummyJobList.php index 4d14ed9e7db43..d0ba68b60e1b5 100644 --- a/tests/lib/BackgroundJob/DummyJobList.php +++ b/tests/lib/BackgroundJob/DummyJobList.php @@ -91,7 +91,7 @@ public function getJobs($job, ?int $limit, int $offset): array { /** * get the next job in the list */ - public function getNext(bool $onlyTimeSensitive = false): ?IJob { + public function getNext(bool $onlyTimeSensitive = false, string $jobClass = null): ?IJob { if (count($this->jobs) > 0) { if ($this->last < (count($this->jobs) - 1)) { $i = $this->last + 1;