| <?php␊ |
| /* -*- tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */␊ |
| /*␊ |
| # ***** BEGIN LICENSE BLOCK *****␊ |
| # This file is part of InDefero, an open source project management application.␊ |
| # Copyright (C) 2008 Céondo Ltd and contributors.␊ |
| #␊ |
| # InDefero is free software; you can redistribute it and/or modify␊ |
| # it under the terms of the GNU General Public License as published by␊ |
| # the Free Software Foundation; either version 2 of the License, or␊ |
| # (at your option) any later version.␊ |
| #␊ |
| # InDefero is distributed in the hope that it will be useful,␊ |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of␊ |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the␊ |
| n# GNU General Public License for more details.␊ |
| #␊ |
| # You should have received a copy of the GNU General Public License␊ |
| # along with this program; if not, write to the Free Software␊ |
| # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA␊ |
| #␊ |
| # ***** END LICENSE BLOCK ***** */␊ |
| ␊ |
| ␊ |
| /**␊ |
| * Queue system for the management of asynchronous operations.␊ |
| *␊ |
| * Anybody can add an item to the queue and any application can␊ |
| * register itself to process an item from the queue.␊ |
| *␊ |
| * An item in the queue is considered as fully processed when all the␊ |
| * handlers have processed it successfully.␊ |
| *␊ |
| * To push a new item in the queue:␊ |
| *␊ |
| * <code>␊ |
| * $item = new IDF_Queue();␊ |
| * $item->type = 'new_commit';␊ |
| * $item->payload = array('what', 'ever', array('data'));␊ |
| * $item->create();␊ |
| * </code>␊ |
| *␊ |
| * To process one item from the queue, you first need to register an␊ |
| * handler, by adding the following in your relations.php file before␊ |
| * the return statement or in your config file.␊ |
| *␊ |
| * <code>␊ |
| * Pluf_Signal::connect('IDF_Queue::processItem', ␊ |
| * array('YourApp_Class', 'processItem'));␊ |
| * </code>␊ |
| *␊ |
| * The processItem method will be called with two arguments, the first␊ |
| * is the name of the signal ('IDF_Queue::processItem') and the second␊ |
| * is an array with:␊ |
| *␊ |
| * <code>␊ |
| * array('item' => $item,␊ |
| * 'res' => array('OtherApp_Class::handler' => false,␊ |
| * 'FooApp_Class::processItem' => true));␊ |
| * </code>␊ |
| *␊ |
| * When you process an item, you need first to check if the type is␊ |
| * corresponding to what you want to work with, then you need to check␊ |
| * in 'res' if you have not already processed successfully the item,␊ |
| * that is the key 'YourApp_Class::processItem' must be set to true,␊ |
| * and then you can process the item. At the end of your processing,␊ |
| * you need to modify by reference the 'res' key to add your status.␊ |
| *␊ |
| * All the data except for the type is in the payload, this makes the␊ |
| * queue flexible to manage many different kind of tasks.␊ |
| *␊ |
| */␊ |
| class IDF_Queue extends Pluf_Model␊ |
| {␊ |
| public $_model = __CLASS__;␊ |
| ␊ |
| function init()␊ |
| {␊ |
| $this->_a['table'] = 'idf_queue';␊ |
| $this->_a['model'] = __CLASS__;␊ |
| $this->_a['cols'] = array(␊ |
| // It is mandatory to have an "id" column.␊ |
| 'id' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Sequence',␊ |
| 'blank' => true, ␊ |
| ),␊ |
| 'status' => ␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Integer',␊ |
| 'blank' => false,␊ |
| 'choices' => array(␊ |
| 'pending' => 0,␊ |
| 'in_progress' => 1,␊ |
| 'need_retry' => 2,␊ |
| 'done' => 3,␊ |
| 'error' => 4,␊ |
| ),␊ |
| 'default' => 0,␊ |
| ),␊ |
| 'trials' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Integer',␊ |
| 'default' => 0,␊ |
| ),␊ |
| 'type' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Varchar',␊ |
| 'blank' => false,␊ |
| 'size' => 50,␊ |
| ),␊ |
| 'payload' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Serialized',␊ |
| 'blank' => false,␊ |
| ),␊ |
| 'results' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Serialized',␊ |
| 'blank' => false,␊ |
| ),␊ |
| 'lasttry_dtime' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Datetime',␊ |
| 'blank' => true,␊ |
| ),␊ |
| 'creation_dtime' =>␊ |
| array(␊ |
| 'type' => 'Pluf_DB_Field_Datetime',␊ |
| 'blank' => true,␊ |
| ),␊ |
| );␊ |
| }␊ |
| ␊ |
| function preSave($create=false)␊ |
| {␊ |
| if ($create) {␊ |
| $this->creation_dtime = gmdate('Y-m-d H:i:s');␊ |
| $this->lasttry_dtime = gmdate('Y-m-d H:i:s');␊ |
| $this->results = array();␊ |
| }␊ |
| }␊ |
| ␊ |
| /**␊ |
| * The current item is going to be processed.␊ |
| */␊ |
| function processItem()␊ |
| {␊ |
| /**␊ |
| * [signal]␊ |
| *␊ |
| * IDF_Queue::processItem␊ |
| *␊ |
| * [sender]␊ |
| *␊ |
| * IDF_Queue␊ |
| *␊ |
| * [description]␊ |
| *␊ |
| * This signal allows an application to run an asynchronous␊ |
| * job. The handler gets the queue item and the results from␊ |
| * the previous run. If the handler key is not set, then the␊ |
| * job was not run. If set it can be either true (already done)␊ |
| * or false (error at last run).␊ |
| *␊ |
| * [parameters]␊ |
| *␊ |
| * array('item' => $item, 'res' => $res)␊ |
| *␊ |
| */␊ |
| $params = array('item' => $this, 'res' => $this->results);␊ |
| Pluf_Signal::send('IDF_Queue::processItem',␊ |
| 'IDF_Queue', $params);␊ |
| $this->status = 3; // Success␊ |
| foreach ($params['res'] as $handler=>$ok) {␊ |
| if (!$ok) {␊ |
| $this->status = 2; // Set to need retry␊ |
| $this->trials += 1;␊ |
| break;␊ |
| }␊ |
| }␊ |
| $this->results = $params['res'];␊ |
| $this->lasttry_dtime = gmdate('Y-m-d H:i:s');␊ |
| $this->update();␊ |
| }␊ |
| }␊ |