Festi Cron
Festi Cron is a powerful task scheduling and job queue management library for the Festi Framework. It provides support for scheduled cron jobs, queue-based execution, and Kafka integration for distributed task processing.
Installation
To install Festi Cron using Composer, add the following to your composer.json
file:
{
"require": {
"festi-team/festi-framework-cron": "dev-develop"
}
}
Then, run:
composer install
Features
- Scheduler Worker – Manage and execute scheduled tasks efficiently.
- Queue-based Execution – Process tasks in the background using job queues.
- Singleton Workers – Ensure that a cron job does not run multiple times simultaneously.
- Kafka Integration – Use Kafka-based queue processing for distributed workloads.
- Exception Handling – Prevent execution conflicts and errors with built-in exception management.
Usage
1. Defining a Cron Job
Create a worker by extending the CronWorker
class:
use core\cron\CronWorker;
class MyCronJob extends CronWorker
{
public function onStart(): void
{
// Your cron task logic here
echo "Executing MyCronJob...";
}
}
2. Running a Cron Job
To execute a cron job manually:
$worker = new MyCronJob();
$worker->start();
3. Using Queue Workers
CREATE TABLE IF NOT EXISTS `queue_[TYPE]` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`data` longtext NOT NULL,
`error` longtext NULL DEFAULT NULL,
`id_worker` int(10) unsigned DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
To handle queued execution:
use cron\CronQueueWorker;
class MyQueueJob extends CronQueueWorker
{
public function onStart(): void
{
echo "Processing queued job...";
}
}
Register and run:
$worker = new MyQueueJob();
$worker->enqueue();
4. Singleton Execution
Ensure a job runs only once at a time:
use cron\CronSingletonWorker;
class BankTransferAccountsSyncCronWorker extends CronSingletonWorker
{
/**
* @override
*/
public function start()
{
$plugin = Core::getInstance()->getPluginInstance('Payments');
assert($plugin instanceof PaymentsPlugin);
$plugin->syncWithServices();
}
}
try {
$worker = new BankTransferAccountsSyncCronWorker();
$worker->start();
} catch (AlreadyRunningCronWorkerException $alreadyExp) {
exit(0);
} catch (Exception $exp) {
FestiUtils::doHandleCoreException($exp);
}
use cron\CronSingletonWorker;
class MoneyReportCronWorker extends CronSingletonWorker
{
/**
* @var StaffPlugin
*/
protected $userPlugin;
/**
* @var FinancePlugin
*/
protected $financePlugin;
/**
* @override
*/
protected function onStart()
{
$core = Core::getInstance();
$this->userPlugin = $core->getPluginInstance('Staff');
$this->financePlugin = $core->getPluginInstance('Finance');
} // end onStart
/**
* @override
*/
protected function onRow($data)
{
$userValuesObject = new UserValuesObject($data);
$this->financePlugin->onCalculateUserMonthlyPayment(
$userValuesObject,
date(static::DATA_FORMAT)
);
}
/**
* @override
*/
protected function onRowError($record, $exp)
{
$values = array(
MoneyTransferValuesObject::FIELD_STATUS => MoneyTransferValuesObject::STATUS_ERROR,
MoneyTransferValuesObject::FIELD_ERROR => $exp->getMessage()
);
$this->financePlugin->updateMoneyTransfer($values, $record['id']);
throw $exp;
}
/**
* @override
*/
protected function getSpool()
{
$search = array(
'salary&IS NOT' => 'NULL',
'hire_date&<=' => date('Y-m-t', $this->_currentTime),
'sql_or' => array(
array('fired&IS' => 'NULL'),
array('fired&>=' => date(static::DATA_FORMAT, $this->_currentTime))
)
);
return $this->userPlugin->search($search);
}
}
try {
$worker = new MoneyReportCronWorker();
$worker->start();
} catch (Exception $exp) {
FestiUtils::doHandleCoreException($exp);
}
class СabinetsOrderSyncQueue extends CronQueueSingletonWorker
{
protected function getStorageName()
{
return 'queue_cabinets_order_sync';
}
protected function onRow($record)
{
$data = json_decode($record['data'], true);
$plugin = Controller::getInstance()->getPluginInstance("CabinetOrders");
$plugin->onTaskOrderSync($data);
}
protected function onDelay()
{
sleep(60);
}
}
$worker = new СabinetsOrderSyncQueue($controller->db);
$worker->start();
5. Kafka Worker
Install PHP Kafka Driver
git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
./configure
make
sudo make install
pecl install rdkafka
Used
class TestWorker extends KafkaCronWorker
{
protected function onRow($record)
{
print_r($record);
}
}
$worker = new TestWorker();
$worker->start();
php test_kafka.php --brokers 78.46.49.42 --topic test --id g1
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list 78.46.49.42:9092 --topic test > /dev/null
class StatisticsWorker extends KafkaCronQueueSingletonWorker
{
const TIMEOUT = 100;
const FILE_NAME_OFFSET_POSTFIX = 'offset';
const DEFAULT_OFFSET_VALUE = '0';
const FILE_NAME_OFFSET_DELIMITER = '-';
const REGEXP_DUPLICATE_EXCEPTION = '#duplicate key value violates#Umis';
protected $core;
protected $plugin;
protected function onStart()
{
$file = $this->_getOffsetFilePath();
if (!file_exists($file)) {
$this->_onPrepareFileOffset($file);
}
$result = file_get_contents($file);
if (!$result) {
$this->_onPrepareFileOffset($file);
}
return true;
}
protected function onRow($record)
{
$this->plugin->onTaskAddStats($record);
return true;
}
protected function getConsumerConfig()
{
$conf = new RdKafka\Conf();
$conf->set('max.partition.fetch.bytes', 100);
$conf->set('security.protocol', 'ssl');
$conf->set(
'ssl.ca.location',
KAFKA_SSL_CERT_PATH.'kafka_trust_cert.pem'
);
$conf->set(
'ssl.certificate.location',
KAFKA_SSL_CERT_PATH.'kafka_client_cert.pem'
);
$conf->set(
'ssl.key.location',
KAFKA_SSL_CERT_PATH.'kafka_client_cet_key.pem'
);
$conf->set('group.id', $this->getOption('id'));
return $conf;
} // end getConsumerConfig
protected function getTopicConfig()
{
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', static::AUTOCOMMIT_TIMEOUT);
// Set the offset store method to 'file'
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
return $topicConf;
} // end getTopicConfig
protected function onSpoolStart()
{
$this->plugin = $this->_getStatisticsPlugin();
return true;
} // end getTopicConfig
protected function onRowError($record, $exp)
{
$msg = $exp->getMessage();
if ($this->_isDuplicateKeyException($msg)) {
return false;
}
$this->_setOffsetToFile($record);
throw $exp;
} // end onRowError
protected function onRowCompleted($record)
{
$this->_setOffsetToFile($record);
return true;
} // end onRowCompleted
}
Best Practices
- Use Singleton Workers to prevent duplicate executions.
- Monitor Cron Logs to troubleshoot job failures.
- Use Kafka for Distributed Jobs when scaling across multiple servers.
- Define Execution Time Limits to avoid long-running tasks blocking the queue.