Background Tasks and Queues
One of the most important and challenging aspects of the system are queues and background tasks. Let's look at what they are and how to use them.
For working with background tasks and queues, use the Festi Cron package. For queues, you can also use the Queues Plugin.
What are Background Tasks?
Background tasks are scripts that run in the background of the system according to a specific schedule. Usually, crontab is used to launch scripts. They are used, for example, for background aggregation of data for different types of reports, for performing calculations that need to be done on a schedule, and also often used for synchronizing data between different systems.
What are Queues and What are They for?
There are certain types of tasks that are too labor-intensive and non-critical to perform in real-time. For example, sending emails during registration or notifications about a new bid on a car. And in order not to load request processing with tasks that we can do later - there are queues, in which we record what should be done and by whom (so-called asynchronously). Queues are stored in the database.
Workers - these are scripts that process queues. Workers can be written in both PHP and Python. The system has two types of queues, or rather workers for parsing queues, multi-task and single-task. We mainly recommend using single-task workers.
Pitfalls with Queues
Due to the fact that workers that process the queue work in the background of the web application, there are certain nuances that you should consider.
- If you write a worker in PHP yourself, you should keep in mind that if you don't use
sleep
between queue parsing iterations, your script will consume 100% CPU. - Be careful with transactions in queues, as if you don't close the transaction at the end of the cycle, new entries in the queue may not appear, but the queue record ID will constantly grow, and sooner or later you will get a deadlock or a block at the innodb storage level.
- Be careful with launching workers via cron, make sure that each launch will not create a new worker, as after n hours you will have so many workers that the server will refuse to work.
- Don't reinvent the wheel, and use classes that are already written for working with queues - they almost all solve the above problems without your help.
Writing a Background Script
Often when writing background scripts, developers forget that it's important for the script to always work as a singleton. For example, if you have a script that aggregates data and you set it to run every hour, but there's a lot of data and it aggregates on average in three hours, then after the first three hours you will have three scripts running that will be doing the same task and overwriting or incorrectly reading data, as there is no coordination between these scripts. To avoid this, it's important to write them taking into account the control of the number of running scripts and coordinating them among themselves.
To write a regular background script, you need to create a class and inherit it from CronSingletonWorker
class YourScheduleWorker extends CronSingletonWorker
{
public function start()
{
$plugin = Core::getInstance()->getPluginInstance("YourPlugin");
$plugin->onTask();
}
}
$worker = new YourScheduleWorker();
$worker->start();
You must override the start
method, usually it calls the plugin you need.
Writing a Single-task Worker
-
Create a table for the queue:
CREATE TABLE IF NOT EXISTS `queue_[TYPE]` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `data` longtext NOT NULL, `error` longtext NULL DEFAULT NULL, `id_worker` int(10) unsigned DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
-
Create a worker class, in the
onRow
method describe the task processing for the worker:class YourWorker extends CronQueueSingletonWorker { protected function getStorageName() { return 'queue_[TYPE]'; } protected function onRow($record) { // Write processing logic } } $worker = new YourWorker($core->db); $worker->start();
Pay attention to the methods
getErrorMessage
,onRowError
- they are convenient to override for adding error handling logic in the queue.
Writing a Single-task Worker Wsing the Queues Plugin
More details about the Queues plugin can be found in the Queues Plugin documentation.
Writing a Multi-task Worker
class CloudCronManager extends CronWorkerManager
{
private $_core;
public function __construct()
{
$this->_core = Core::getInstance();
parent::__construct($this->_core->db);
} // end __construct
protected function getSpool()
{
$path = FS_CRON.'/libs/';
$object = DataAccessObject::getInstance(
'Manager',
$this->_core->db,
$path
);
return $object->getApps();
} // end getSpool
protected function onRow($app)
{
$crons = array(
FS_CRON.'/AppQueues/queues.php --name common --id '.$app['id'].
' --secret '.$app['secret'],
FS_CRON.'/AppQueues/queues.php --name push --id '.$app['id'].
' --secret '.$app['secret'],
FS_CRON.'/broadcast_push.php --id '.$app['id'].
' --secret '.$app['secret'],
);
foreach ($crons as $cronPath) {
$this->startProcess($cronPath);
}
} // end onRow
/**
* @override
*/
protected function onDelay()
{
sleep(60 * 5);
} // end onDelay
protected function getOptions()
{
return array();
}
/**
* @override
*/
protected function getBaseLockPath()
{
return FS_ROOT.'crons/locks/';
} // end getBaseLockPath
}
try {
$manager = new CloudCronManager();
$manager->start();
} catch (AlreadyRunningCronWorkerException $exp) {
exit();
}
coming soon...
Kafka Workers
To create and process queues and data from Kafka, you need to create workers from classes: KafkaCronWorker
, KafkaCronQueueWorker
, KafkaCronQueueSingletonWorker
.
To work with Kafka, you need to install the php-rdkafka PHP module on the server
test_kafka.php:
class TestWorker extends KafkaCronWorker
{
protected function onRow($record)
{
print_r($record);
}
}
$worker = new TestWorker();
$worker->start();
php test_kafka.php --brokers 78.46.XX.42 --topic test --id g1
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list 78.46.XX.42:9092 --topic test > /dev/null
Rules for Writing Workers and Background Scripts
- Move business logic to plugins
- Methods in plugins that are responsible for processing should have the prefix
onTask
Examples of Workers
Processing Multiple Sites to Check Data Using Multiple Single-task Workers
We have a list of links where we need to check for certain information. There might be many such links, and if you check them in one thread with one script, this process can take several days or even several weeks. There are many solution options, but in this case we'll solve it by writing a single-task worker, but we'll launch N such workers.
There is a referrers
table:
| id | url | status | last_check_date |
Task: We need to check for certain content on referral links every week.
1 referrers_worker.php
:
```php
require_once __DIR__.'/config.php';
class ReferrersWorker extends CronQueueSingletonWorker
{
protected function getStorageName()
{
return 'queue_referrers';
}
protected function onRow($record)
{
$data = json_decode($record['data'], true);
$plugin = Core::getInstance()->getPluginInstance("Referrers");
$plugin->onTaskVerifyUrl($data);
}
protected function onDelay()
{
sleep(60);
}
}
$worker = new ReferrersWorker($core->db);
$worker->start();
```
-
Plugin:
class ReferrersPlugin extends ObjectPlugin { public function onTaskFillQueue() { $referrers = $this->object->search(); $rows = array(); foreach ($referrers as $data) { $rows[] = array( 'data' => json_encode($data) ); } if ($rows) { $this->object->addQueueItems($rows); } return true; } public function onTaskVerifyUrl($data) { $content = file_get_contents($data['url']); ... } } class ReferrersObject extends DataAccessObject { const TABLE_NAME = "referrers"; const QUEUE_TABLE_NAME = "queue_referrers"; public function search($search = array()) { $sql = "SELECT * FROM ".static::TABLE_NAME; return $this->select($sql, $search); } public function addQueueItems($values) { return $this->massInsert(static::QUEUE_TABLE_NAME, $values); } }
-
Configure the launch:
0 0 * * 0 /usr/bin/php /PATH/fill_queue_referrers.php */10 * * * * /usr/bin/php /PATH/referrers_worker.php --id 1 */10 * * * * /usr/bin/php /PATH/referrers_worker.php --id 2 */10 * * * * /usr/bin/php /PATH/referrers_worker.php --id N
You can specify as many workers as you want to ensure optimal performance
Implementing Asynchronous Work with Remote APIs through Queues
Imagine that we have a form upon submission of which we must get data from a remote API (there may be several APIs) or we need to make several requests to the API to form a response. For reliability and stability, such tasks need to be solved through queues for several reasons: - Ensuring system reliability; - not being dependent on page refresh or server/database restart; - will allow not to increase the time of request execution; - will allow to quickly scale by increasing the number of workers.
What we need to solve this task: 1. We should have a queue. We'll implement it using the Queues plugin, but it can also be done using standard framework tools 2. An additional table in which we'll store the results 3. A bit of JavaScript to check the status of our asynchronous request on some timeout
Solution:
-
Create tables:
CREATE TABLE `queue_shipping` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `type` varchar(25) NOT NULL, `data` longtext NOT NULL, `id_worker` int(10) unsigned DEFAULT NULL, `error` LONGTEXT NULL DEFAULT NULL, PRIMARY KEY (`id`), KEY `type` (`type`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE IF NOT EXISTS `shipping_responses` ( `id` int(10) unsigned NOT NULL, `id_request` int(10) unsigned NOT NULL, `response` text NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; ALTER TABLE `shipping_responses` ADD PRIMARY KEY (`id`); ALTER TABLE `shipping_responses` MODIFY `id` int(10) unsigned NOT NULL AUTO_INCREMENT;
-
Create a plugin with four methods for displaying the form, creating a request, checking the result of the request, queue handler:
class ShippingPlugin extends DisplayPlugin { /** * @urlRule ~^/test/shipping/$~ * @area backend */ public function onDisplayDefault(Response &$response) { $response->content = $this->fetch('form.phtml'); return true; } // end onDisplayDefault /** * @urlRule ~^/test/shipping/result/([0-9]+)/$~ * @section none * @area backend * @userType user */ public function onJsonResponse(Response &$response, $idRequest) { $data = $this->object->getResponseByID($idRequest); $response->status = !empty($data); if ($data) { $response->data = json_decode($data['response'], true); } return true; } // end onDisplayDefault /** * @urlRule ~^/test/shipping/request/$~ * @area backend */ public function onJsonRequest(Response &$response) { $fields = array( 'barcode' => array( 'type' => static::FIELD_TYPE_SECURITY_STRING, 'required' => true, 'error' => __('Please type barcode.') ) ); $data = $this->getPreparedData($_REQUEST, $fields, $errors); if ($errors) { foreach ($errors as $name => $message) { throw new FieldException($message, '#'.$name); } } $idRequest = $this->plugin->queues->add($this, "api", $data); $response->id = $idRequest; return true; } // end onJsonRequest /** * @worker */ public function onQueueItemWithTypeApi($data, $idRequest) { // Add your logic to work with API or what do you want... $response = array( 'price' => 100, 'idRequest' => $idRequest ); $values = array( 'id_request' => $idRequest, 'response' => json_encode($response) ); $this->object->addResponse($values); return true; } // end onQueueItemWithTypeApi } class ShippingObject extends DataAccessObject { public function getResponseByID($id) { $search = array( 'id_request' => $id ); return $this->select("SELECT * FROM shipping_responses", $search, false, static::FETCH_ROW); } public function addResponse($values) { return $this->insert("shipping_responses", $values); } }
-
Write JS to process the request:
<form id="form" action="<?php echo $this->core->getUrl('/test/shipping/request/'); ?>"> <input type="text" name="barcode" id="barcode" placeholder="Barcode" /> <button type="submit" id="submit" disabled>Get Price</button> </form> <script> var ShippingManager = { timeout: 2000, onCreateRequest: function (data) { if (typeof(data.id) === "undefined") { Jimbo.response(data); return false; } Jimbo.showLoading(); ShippingManager.setScheduleCheckRequest(data.id); }, // end onCreateRequest setScheduleCheckRequest: function (id) { setTimeout("ShippingManager.onLoadResponse(" + id + ")" , ShippingManager.timeout); }, onLoadResponse: function(id) { var url = '<?php echo $this->core->getUrl('/test/shipping/result/%s/', "' + id + '"); ?>'; jQuery.ajax({ type: "GET", url: url, success: function(response) { if (response.status) { console.log(response); Jimbo.hideLoading(); alert(response.data.price); } else { ShippingManager.setScheduleCheckRequest(id); } } }); } // end onLoadResponse }; jQuery(document).ready(function() { jQuery("#form").submit(function(event) { event.preventDefault(); var form = jQuery(this); var url = form.attr('action'); jQuery.ajax({ type: "POST", url: url, data: form.serialize(), success: function(data) { ShippingManager.onCreateRequest(data); } }); }); jQuery('#submit').removeAttr("disabled"); }); </script>
-
Add to crontab (if Windows, there is Task Scheduler):
*/5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php --id WORKER_ID_1 --name PLUGIN_NAME */5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php --id WORKER_ID_2 --name PLUGIN_NAME ... */5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php --id WORKER_ID_N --name PLUGIN_NAME
You can specify as many workers as you need to achieve maximum request processing speed