Queues Plugin

The Queues Plugin allows efficient management of background tasks in the Festi ecosystem. It supports the creation, scheduling, and processing of queued tasks using customizable workers. With this plugin, developers can offload tasks that may take time to process (such as sending emails, processing data, etc.) into a queue for later execution, improving the performance and responsiveness of the main application. This plugin is flexible, supporting multiple task types, and integrates easily with MySQL, PostgreSQL and SQL Server databases. Workers can be scheduled via cron jobs, and the tasks can be easily managed, skipped, or customized based on your application’s needs.

Installation

  1. Add the plugin to the project.
  2. Add the following table creation query:

MySQL:

CREATE TABLE `queue_[PLUGIN_NAME_LOWERCASE]`
(
   `id`        INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
   `type`      VARCHAR(25)      NOT NULL,
   `data`      LONGTEXT         NOT NULL,
   `id_worker` INT(10) UNSIGNED DEFAULT NULL,
   `error`     LONGTEXT         NULL DEFAULT NULL,
   PRIMARY KEY (`id`),
   KEY `type` (`type`)
) ENGINE = InnoDB DEFAULT CHARSET = utf8;

PostgreSQL:

CREATE TABLE queue_[PLUGIN_NAME_LOWERCASE]
(
   id        SERIAL,
   type      VARCHAR(25)      NOT NULL,
   data      LONGTEXT         NOT NULL,
   id_worker INT CHECK (id_worker > 0)      DEFAULT NULL,
   error     LONGTEXT         NULL DEFAULT NULL,
   PRIMARY KEY (id)
);

CREATE INDEX type ON queue_[PLUGIN_NAME_LOWERCASE] (type);
  1. Ensure that the environment has Core with the correct parameters (Example: init.dist.php).
  2. Add a cron job to the crontab:
    */5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php --id WORKER_ID --name PLUGIN_NAME

To create init.queue.php outside the plugin folder, use the -i parameter to specify the path to init.queue.php:

*/5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php -i ../../init.queue.php --id WORKER_ID --name PLUGIN_NAME

Usage

Adding a Task to the Queue

  • Through the plugin instance:
$this->plugin->queues->add($this, '[TYPE]', $values);
$queuesPlugin = Core::getInstance()->getPluginInstance('Queues');
$queuesPlugin->add($this, '[TYPE]', $values);
  • By plugin name:
$this->plugin->queues->addByName('Customers', '[TYPE]', $values);

Executing Tasks

class [PLUGIN_NAME]Plugin extends DisplayPlugin
{
    /**
     * @worker
     * @param mixed $data
     */
    public function onQueueItemWithType[TYPE_IN_CAMEL_CASE](./$data, $idQueueItem)
    {
        //...
    }
}

$idQueueItem - optional parameter.

If you want to skip a queue item, return QueueWorker::RESULT_SKIP_ITEM. For example:

public function onQueueItemWithTypeProcessingData($data)
{
    if ($data['status'] == "pending") {
        return QueueWorker::RESULT_SKIP_ITEM;
    }

    // Further processing...

    return true;
}

If you need to limit the number of tasks a worker can process at once, you can use the 'limit' parameter. This is useful for better task distribution among multiple workers.

*/5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php --id 1 --name ServicesParser --limit 1

You can also restrict task execution by type:

*/5 * * * * /usr/bin/php -q PATH_PLUGINS/Queues/queues.php --id 1 --name Customer --type ForgotPassword

Write Unit Test

public function testPluginQueues()
{
    $plugin = $this->core->getPluginInstance('Test');

    assert($plugin instanceof TestPlugin);
    $this->assertTrue($plugin instanceof TestPlugin);

    $data = array(
        'id' => 'test'
    );

    $this->plugin->queues->addByName('test', 'test_type', $data);

    $plugin->addQueueItem('second_test_type', $data);

    $worker = new TestQueueWorker($this->core->db);

    $worker->iteration();

    $result = $plugin->getData();

    $this->assertTrue(count($result) == 2);
}