Festi Cron

Festi Cron is a powerful task scheduling and job queue management library for the Festi Framework. It provides support for scheduled cron jobs, queue-based execution, and Kafka integration for distributed task processing.

Installation

To install Festi Cron using Composer, add the following to your composer.json file:

{
    "require": {
        "festi-team/festi-framework-cron": "dev-develop"
    }
}

Then, run:

composer install

Features

  • Scheduler Worker – Manage and execute scheduled tasks efficiently.
  • Queue-based Execution – Process tasks in the background using job queues.
  • Singleton Workers – Ensure that a cron job does not run multiple times simultaneously.
  • Kafka Integration – Use Kafka-based queue processing for distributed workloads.
  • Exception Handling – Prevent execution conflicts and errors with built-in exception management.

Usage

1. Defining a Cron Job

Create a worker by extending the CronWorker class:

use core\cron\CronWorker;

class MyCronJob extends CronWorker
{
    public function onStart(): void
    {
        // Your cron task logic here
        echo "Executing MyCronJob...";
    }
}

2. Running a Cron Job

To execute a cron job manually:

$worker = new MyCronJob();
$worker->start();

3. Using Queue Workers

CREATE TABLE IF NOT EXISTS `queue_[TYPE]` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `data` longtext NOT NULL,
  `error` longtext NULL DEFAULT NULL,
  `id_worker` int(10) unsigned DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

To handle queued execution:

use cron\CronQueueWorker;

class MyQueueJob extends CronQueueWorker
{
    public function onStart(): void
    {
        echo "Processing queued job...";
    }


}

Register and run:

$worker = new MyQueueJob();
$worker->enqueue();

4. Singleton Execution

Ensure a job runs only once at a time:

use cron\CronSingletonWorker;

class BankTransferAccountsSyncCronWorker extends CronSingletonWorker
{
    /**
     * @override
     */
    public function start()
    {
        $plugin = Core::getInstance()->getPluginInstance('Payments');
        assert($plugin instanceof PaymentsPlugin);

        $plugin->syncWithServices();
    }
}

try {
    $worker = new BankTransferAccountsSyncCronWorker();
    $worker->start();

} catch (AlreadyRunningCronWorkerException $alreadyExp) {
    exit(0);
} catch (Exception $exp) {
    FestiUtils::doHandleCoreException($exp);
}
use cron\CronSingletonWorker;

class MoneyReportCronWorker extends CronSingletonWorker
{

    /**
     * @var StaffPlugin
     */
    protected $userPlugin;

    /**
     * @var FinancePlugin
     */
    protected $financePlugin;

    /**
     * @override
     */
    protected function onStart()
    {
        $core = Core::getInstance();

        $this->userPlugin = $core->getPluginInstance('Staff');
        $this->financePlugin = $core->getPluginInstance('Finance');
    } // end onStart

    /**
     * @override
     */
    protected function onRow($data)
    {
        $userValuesObject = new UserValuesObject($data);

        $this->financePlugin->onCalculateUserMonthlyPayment(
            $userValuesObject, 
            date(static::DATA_FORMAT)
        );
    }

    /**
     * @override
     */
    protected function onRowError($record, $exp)
    {
        $values = array(
            MoneyTransferValuesObject::FIELD_STATUS => MoneyTransferValuesObject::STATUS_ERROR,
            MoneyTransferValuesObject::FIELD_ERROR  => $exp->getMessage()
        );

        $this->financePlugin->updateMoneyTransfer($values, $record['id']);

        throw $exp;
    }

    /**
     * @override
     */
    protected function getSpool()
    {
        $search = array(
            'salary&IS NOT' => 'NULL',
            'hire_date&<=' => date('Y-m-t', $this->_currentTime),
            'sql_or' => array(
                array('fired&IS' => 'NULL'),
                array('fired&>=' => date(static::DATA_FORMAT, $this->_currentTime))
            )
        );

        return $this->userPlugin->search($search);
    }
}

try {
    $worker = new MoneyReportCronWorker();
    $worker->start();
} catch (Exception $exp) {
    FestiUtils::doHandleCoreException($exp);
}
class СabinetsOrderSyncQueue extends CronQueueSingletonWorker
{

    protected function getStorageName()
    {
        return 'queue_cabinets_order_sync';
    }

    protected function onRow($record)
    {
        $data = json_decode($record['data'], true);

        $plugin = Controller::getInstance()->getPluginInstance("CabinetOrders");
        $plugin->onTaskOrderSync($data);
    }

    protected function onDelay()
    {
        sleep(60);
    }
}

$worker = new СabinetsOrderSyncQueue($controller->db);
$worker->start();

5. Kafka Worker

Install PHP Kafka Driver

  1. Install C/C++ Driver
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
  1. Install php-rdkafka
pecl install rdkafka

Used

class TestWorker extends KafkaCronWorker
{
    protected function onRow($record)
    {
        print_r($record);
    }
}

$worker = new TestWorker();
$worker->start();
php test_kafka.php --brokers 78.46.49.42 --topic test --id g1
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list 78.46.49.42:9092 --topic test > /dev/null
class StatisticsWorker extends KafkaCronQueueSingletonWorker
{
    const TIMEOUT = 100;
    const FILE_NAME_OFFSET_POSTFIX   = 'offset';
    const DEFAULT_OFFSET_VALUE       = '0';
    const FILE_NAME_OFFSET_DELIMITER = '-';
    const REGEXP_DUPLICATE_EXCEPTION = '#duplicate key value violates#Umis';

    protected $core;
    protected $plugin;

    protected function onStart()
    {
        $file = $this->_getOffsetFilePath();

        if (!file_exists($file)) {
            $this->_onPrepareFileOffset($file);
        }

        $result = file_get_contents($file);

        if (!$result) {
            $this->_onPrepareFileOffset($file);
        }

        return true;
    }

    protected function onRow($record)
    {
        $this->plugin->onTaskAddStats($record);

        return true;
    }

    protected function getConsumerConfig()
    {
        $conf = new RdKafka\Conf();

        $conf->set('max.partition.fetch.bytes', 100);
        $conf->set('security.protocol', 'ssl');
        $conf->set(
            'ssl.ca.location',
            KAFKA_SSL_CERT_PATH.'kafka_trust_cert.pem'
        );
        $conf->set(
            'ssl.certificate.location',
            KAFKA_SSL_CERT_PATH.'kafka_client_cert.pem'
        );
        $conf->set(
            'ssl.key.location',
            KAFKA_SSL_CERT_PATH.'kafka_client_cet_key.pem'
        );

        $conf->set('group.id', $this->getOption('id'));

        return $conf;
    } // end getConsumerConfig


    protected function getTopicConfig()
    {
        $topicConf = new RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', static::AUTOCOMMIT_TIMEOUT);

        // Set the offset store method to 'file'
        $topicConf->set('offset.store.method', 'file');
        $topicConf->set('offset.store.path', sys_get_temp_dir());

        return $topicConf;
    } // end getTopicConfig

    protected function onSpoolStart()
    {
        $this->plugin = $this->_getStatisticsPlugin();

        return true;
    } // end getTopicConfig

    protected function onRowError($record, $exp)
    {
        $msg = $exp->getMessage();

        if ($this->_isDuplicateKeyException($msg)) {
            return false;
        }

        $this->_setOffsetToFile($record);

        throw $exp;
    } // end onRowError

    protected function onRowCompleted($record)
    {
        $this->_setOffsetToFile($record);

        return true;
    } // end onRowCompleted
}

Best Practices

  • Use Singleton Workers to prevent duplicate executions.
  • Monitor Cron Logs to troubleshoot job failures.
  • Use Kafka for Distributed Jobs when scaling across multiple servers.
  • Define Execution Time Limits to avoid long-running tasks blocking the queue.