- Add CloudronStack/output/CloudronPackages-Artifacts/tirreno/ directory and its contents - Includes package manifest, Dockerfile, source code, documentation, and build artifacts - Add tirreno-1761840148.tar.gz as a build artifact - Add tirreno-cloudron-package-1761841304.tar.gz as the Cloudron package - Include all necessary files for the tirreno Cloudron package This adds the complete tirreno Cloudron package artifacts to the repository.
		
			
				
	
	
		
			391 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
			
		
		
	
	
			391 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			PHP
		
	
	
	
	
	
<?php
 | 
						|
 | 
						|
/**
 | 
						|
 * Tirreno ~ Open source user analytics
 | 
						|
 * Copyright (c) Tirreno Technologies Sàrl (https://www.tirreno.com)
 | 
						|
 *
 | 
						|
 * Licensed under GNU Affero General Public License version 3 of the or any later version.
 | 
						|
 * For full copyright and license information, please see the LICENSE
 | 
						|
 * Redistributions of files must retain the above copyright notice.
 | 
						|
 *
 | 
						|
 * @copyright     Copyright (c) Tirreno Technologies Sàrl (https://www.tirreno.com)
 | 
						|
 * @license       https://opensource.org/licenses/AGPL-3.0 AGPL License
 | 
						|
 * @link          https://www.tirreno.com Tirreno(tm)
 | 
						|
 */
 | 
						|
 | 
						|
namespace Models\Queue;
 | 
						|
 | 
						|
class AccountOperationQueue extends \Models\BaseSql {
 | 
						|
    public const DATETIME_FORMAT = 'Y-m-d H:i:s.u';
 | 
						|
 | 
						|
    protected $DB_TABLE_NAME = 'queue_account_operation';
 | 
						|
 | 
						|
    private \Type\QueueAccountOperationActionType $actionType;
 | 
						|
 | 
						|
    public function __construct(\Type\QueueAccountOperationActionType $actionType) {
 | 
						|
        $this->actionType = $actionType;
 | 
						|
 | 
						|
        parent::__construct();
 | 
						|
    }
 | 
						|
 | 
						|
    public function add(int $accountId, int $key): void {
 | 
						|
        $this->reset();
 | 
						|
        $this->event_account = $accountId;
 | 
						|
        $this->key = $key;
 | 
						|
        $this->action = $this->actionType->value;
 | 
						|
        $this->save();
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param array{accountId: int, key: int}[] $accounts
 | 
						|
     */
 | 
						|
    public function addBatch(array $accounts): void {
 | 
						|
        if (count($accounts) === 0) {
 | 
						|
            return;
 | 
						|
        }
 | 
						|
 | 
						|
        $params = [':action' => strval($this->actionType->value)];
 | 
						|
        $arrayPlaceholders = [];
 | 
						|
        $prefix = '';
 | 
						|
        foreach ($accounts as $idx => $record) {
 | 
						|
            $prefix = ":{$idx}_";
 | 
						|
 | 
						|
            $params[$prefix . 'idx'] = $idx;
 | 
						|
            $params[$prefix . 'account_id'] = intval($record['accountId']);
 | 
						|
            $params[$prefix . 'key'] = intval($record['key']);
 | 
						|
            $arrayPlaceholders[] = "({$prefix}idx, {$prefix}account_id, {$prefix}key)";
 | 
						|
        }
 | 
						|
 | 
						|
        $strPlaceholders = \implode(', ', $arrayPlaceholders);
 | 
						|
 | 
						|
        // update waiting records
 | 
						|
        $query = (
 | 
						|
            "UPDATE queue_account_operation
 | 
						|
            SET
 | 
						|
                updated = now()
 | 
						|
            FROM (VALUES $strPlaceholders) AS v(idx, account_id, key)
 | 
						|
            WHERE
 | 
						|
                queue_account_operation.event_account   = v.account_id::bigint AND
 | 
						|
                queue_account_operation.key             = v.key::bigint AND
 | 
						|
                queue_account_operation.action          = :action AND
 | 
						|
                queue_account_operation.status          = 'waiting'
 | 
						|
            RETURNING v.idx"
 | 
						|
        );
 | 
						|
 | 
						|
        $results = $this->execQuery($query, $params);
 | 
						|
 | 
						|
        $updatedIdxs = array_unique(array_column($results, 'idx'));
 | 
						|
        $notUpdatedIdxs = array_keys(array_diff(array_keys($accounts), $updatedIdxs));
 | 
						|
 | 
						|
        if (!count($notUpdatedIdxs)) {
 | 
						|
            return;
 | 
						|
        }
 | 
						|
 | 
						|
        $params = [':action' => strval($this->actionType->value)];
 | 
						|
        $arrayPlaceholders = [];
 | 
						|
        foreach ($notUpdatedIdxs as $idxToInsert) {
 | 
						|
            $prefix = ":{$idxToInsert}_";
 | 
						|
            $record = $accounts[$idxToInsert];
 | 
						|
 | 
						|
            $params[$prefix . 'account_id'] = $record['accountId'];
 | 
						|
            $params[$prefix . 'key'] = $record['key'];
 | 
						|
            $arrayPlaceholders[] = "({$prefix}account_id, {$prefix}key, :action)";
 | 
						|
        }
 | 
						|
 | 
						|
        $strPlaceholders = \implode(', ', $arrayPlaceholders);
 | 
						|
 | 
						|
        $query = "INSERT INTO queue_account_operation (event_account, key, action) VALUES {$strPlaceholders} RETURNING id";
 | 
						|
 | 
						|
        $result = $this->execQuery($query, $params);
 | 
						|
 | 
						|
        $msg = sprintf('Adding %s accounts to %s queue -- %s updated, %s inserted', count($accounts), strval($this->actionType->value), count($updatedIdxs), count($result));
 | 
						|
        \Utils\Logger::log(null, $msg);
 | 
						|
    }
 | 
						|
 | 
						|
    public function addBatchIds(array $accountIds, int $key): void {
 | 
						|
        $batchSize = \Utils\Variables::getAccountOperationQueueBatchSize();
 | 
						|
 | 
						|
        $batch = [];
 | 
						|
        $cnt = 0;
 | 
						|
 | 
						|
        foreach ($accountIds as $id) {
 | 
						|
            $batch[] = [
 | 
						|
                'accountId' => $id,
 | 
						|
                'key' => $key,
 | 
						|
            ];
 | 
						|
            $cnt++;
 | 
						|
 | 
						|
            if ($cnt >= $batchSize) {
 | 
						|
                $this->addBatch($batch);
 | 
						|
                $batch = [];
 | 
						|
                $cnt = 0;
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        if ($cnt) {
 | 
						|
            $this->addBatch($batch);
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    public function isInQueueStatus(int $accountId, int $key): array {
 | 
						|
        $this->reset();
 | 
						|
        $this->load([
 | 
						|
            'event_account = ? AND key = ? AND action = ? AND status != ?',
 | 
						|
            $accountId, $key, $this->actionType->value, \Type\QueueAccountOperationStatusType::COMPLETED,
 | 
						|
        ]);
 | 
						|
 | 
						|
        return $this->dry() ? [false, null] : [true, $this->status];
 | 
						|
    }
 | 
						|
 | 
						|
    public function isInQueue(int $accountId, int $key): bool {
 | 
						|
        $this->reset();
 | 
						|
        $this->load([
 | 
						|
            'event_account = ? AND key = ? AND action = ? AND status != ?',
 | 
						|
            $accountId, $key, $this->actionType->value, \Type\QueueAccountOperationStatusType::COMPLETED,
 | 
						|
        ]);
 | 
						|
 | 
						|
        return $this->dry() ? false : true;
 | 
						|
    }
 | 
						|
 | 
						|
    public function isExecuting(): bool {
 | 
						|
        $this->load([
 | 
						|
            'action = ? AND status = ?',
 | 
						|
            $this->actionType->value, \Type\QueueAccountOperationStatusType::EXECUTING,
 | 
						|
        ]);
 | 
						|
 | 
						|
        return $this->dry() ? false : true;
 | 
						|
    }
 | 
						|
 | 
						|
    public function actionIsInQueueProcessing(int $key): bool {
 | 
						|
        $this->reset();
 | 
						|
        $this->load([
 | 
						|
            'key = ? AND action = ? AND status != ? AND status != ?',
 | 
						|
            $key,
 | 
						|
            $this->actionType->value,
 | 
						|
            \Type\QueueAccountOperationStatusType::COMPLETED,
 | 
						|
            \Type\QueueAccountOperationStatusType::FAILED,
 | 
						|
        ]);
 | 
						|
 | 
						|
        return $this->dry() ? false : true;
 | 
						|
    }
 | 
						|
 | 
						|
    public function getNextInQueue(): array|null {
 | 
						|
        $this->reset();
 | 
						|
        $this->creator = 'SELECT creator
 | 
						|
            FROM dshb_api
 | 
						|
            WHERE dshb_api.id = queue_account_operation.key';
 | 
						|
        $this->load([
 | 
						|
            'action = ? AND status = ?',
 | 
						|
            $this->actionType->value, \Type\QueueAccountOperationStatusType::WAITING,
 | 
						|
        ], ['order' => 'created ASC']);
 | 
						|
 | 
						|
        return $this->dry() ? null : $this->cast();
 | 
						|
    }
 | 
						|
 | 
						|
    public function getNextBatchInQueue(int $batchSize): array {
 | 
						|
        $params = [
 | 
						|
            ':batchSize' => $batchSize,
 | 
						|
            ':action' => $this->actionType->value,
 | 
						|
            ':status' => \Type\QueueAccountOperationStatusType::WAITING,
 | 
						|
        ];
 | 
						|
 | 
						|
        $query = ('
 | 
						|
            SELECT
 | 
						|
                queue_account_operation.*,
 | 
						|
                dshb_api.creator
 | 
						|
            FROM queue_account_operation
 | 
						|
            JOIN dshb_api
 | 
						|
            ON dshb_api.id = queue_account_operation.key
 | 
						|
            WHERE
 | 
						|
                action = :action
 | 
						|
                AND status = :status
 | 
						|
            ORDER BY id ASC
 | 
						|
            LIMIT :batchSize
 | 
						|
        ');
 | 
						|
 | 
						|
        return $this->execQuery($query, $params);
 | 
						|
    }
 | 
						|
 | 
						|
    public function getNextBatchKeysInQueue(int $batchSize): array {
 | 
						|
        $params = [
 | 
						|
            ':batchSize' => $batchSize,
 | 
						|
            ':action' => $this->actionType->value,
 | 
						|
            ':status' => \Type\QueueAccountOperationStatusType::WAITING,
 | 
						|
        ];
 | 
						|
 | 
						|
        $query = ('
 | 
						|
            SELECT
 | 
						|
                DISTINCT key
 | 
						|
            FROM (
 | 
						|
                SELECT
 | 
						|
                    queue_account_operation.id,
 | 
						|
                    queue_account_operation.key
 | 
						|
                FROM queue_account_operation
 | 
						|
                WHERE
 | 
						|
                    action = :action
 | 
						|
                    AND status = :status
 | 
						|
                ORDER BY id ASC
 | 
						|
                LIMIT :batchSize
 | 
						|
            ) AS t
 | 
						|
        ');
 | 
						|
 | 
						|
        $results = $this->execQuery($query, $params);
 | 
						|
 | 
						|
        return array_column($results, 'key');
 | 
						|
    }
 | 
						|
 | 
						|
    public function setWaiting(): void {
 | 
						|
        if ($this->loaded()) {
 | 
						|
            $now = new \DateTime();
 | 
						|
            $this->updated = $now->format(self::DATETIME_FORMAT);
 | 
						|
            $this->status = \Type\QueueAccountOperationStatusType::WAITING;
 | 
						|
            $this->save();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param int[] $ids
 | 
						|
     */
 | 
						|
    public function setWaitingForBatch(array $ids): void {
 | 
						|
        $this->setStatusForBatch(
 | 
						|
            $ids,
 | 
						|
            new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::WAITING),
 | 
						|
        );
 | 
						|
    }
 | 
						|
 | 
						|
    public function setExecuting(): void {
 | 
						|
        if ($this->loaded()) {
 | 
						|
            $now = new \DateTime();
 | 
						|
            $this->updated = $now->format(self::DATETIME_FORMAT);
 | 
						|
            $this->status = \Type\QueueAccountOperationStatusType::EXECUTING;
 | 
						|
            $this->save();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param int[] $ids
 | 
						|
     */
 | 
						|
    public function setExecutingForBatch(array $ids): void {
 | 
						|
        $this->setStatusForBatch(
 | 
						|
            $ids,
 | 
						|
            new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::EXECUTING),
 | 
						|
        );
 | 
						|
    }
 | 
						|
 | 
						|
    public function setCompleted(): void {
 | 
						|
        if ($this->loaded()) {
 | 
						|
            $now = new \DateTime();
 | 
						|
            $this->updated = $now->format(self::DATETIME_FORMAT);
 | 
						|
            $this->status = \Type\QueueAccountOperationStatusType::COMPLETED;
 | 
						|
            $this->save();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param int[] $ids
 | 
						|
     */
 | 
						|
    public function setCompletedForBatch(array $ids): void {
 | 
						|
        $this->setStatusForBatch(
 | 
						|
            $ids,
 | 
						|
            new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::COMPLETED),
 | 
						|
        );
 | 
						|
    }
 | 
						|
 | 
						|
    public function setFailed(): void {
 | 
						|
        if ($this->loaded()) {
 | 
						|
            $now = new \DateTime();
 | 
						|
            $this->updated = $now->format(self::DATETIME_FORMAT);
 | 
						|
            $this->status = \Type\QueueAccountOperationStatusType::FAILED;
 | 
						|
            $this->save();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param int[] $ids
 | 
						|
     */
 | 
						|
    public function setFailedForBatch(array $ids): void {
 | 
						|
        $this->setStatusForBatch(
 | 
						|
            $ids,
 | 
						|
            new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::FAILED),
 | 
						|
        );
 | 
						|
    }
 | 
						|
 | 
						|
    /**
 | 
						|
     * @param int[] $ids
 | 
						|
     */
 | 
						|
    private function setStatusForBatch(array $ids, \Type\QueueAccountOperationStatusType $status): void {
 | 
						|
        if (!count($ids)) {
 | 
						|
            return;
 | 
						|
        }
 | 
						|
 | 
						|
        [$params, $placeHolders] = $this->getArrayPlaceholders($ids);
 | 
						|
 | 
						|
        $params[':status'] = $status->value;
 | 
						|
        $params[':updated'] = (new \DateTime())->format(self::DATETIME_FORMAT);
 | 
						|
 | 
						|
        $query = ("
 | 
						|
            UPDATE queue_account_operation
 | 
						|
            SET
 | 
						|
                status = :status,
 | 
						|
                updated = :updated
 | 
						|
            WHERE
 | 
						|
                id IN ({$placeHolders})
 | 
						|
        ");
 | 
						|
 | 
						|
        $this->execQuery($query, $params);
 | 
						|
    }
 | 
						|
 | 
						|
    public function removeFromQueue(): void {
 | 
						|
        if ($this->loaded()) {
 | 
						|
            $this->erase();
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    public function unclog(): bool {
 | 
						|
        if ($this->loaded() && $this->status === \Type\QueueAccountOperationStatusType::EXECUTING) {
 | 
						|
            $updatedDateTime = new \DateTime($this->updated);
 | 
						|
            $currentDateTime = new \DateTime();
 | 
						|
 | 
						|
            $differenceInSeconds = $currentDateTime->getTimestamp() - $updatedDateTime->getTimestamp();
 | 
						|
            $totalMinutes = (int) floor($differenceInSeconds / 60);
 | 
						|
 | 
						|
            if ($totalMinutes < \Utils\Constants::get('ACCOUNT_OPERATION_QUEUE_AUTO_UNCLOG_AFTER_MINUTES')) {
 | 
						|
                return false; // Time not elapsed, no need to unclog (yet).
 | 
						|
            }
 | 
						|
 | 
						|
            $this->setFailed();
 | 
						|
 | 
						|
            $msg = sprintf('Queue failed on unclog (now - updated > 2 hours) on account %d minutes diff %d.', $this->event_account, $totalMinutes);
 | 
						|
            \Utils\Logger::log(null, $msg);
 | 
						|
 | 
						|
            return true; // Unclogged queue.
 | 
						|
        }
 | 
						|
 | 
						|
        return false; // No need to unclog.
 | 
						|
    }
 | 
						|
 | 
						|
    public function clearCompleted(\DateTime $clearBefore): int {
 | 
						|
        $this->reset();
 | 
						|
 | 
						|
        $params = [
 | 
						|
            ':daysAgo' => $clearBefore->format(self::DATETIME_FORMAT),
 | 
						|
            ':status' => \Type\QueueAccountOperationStatusType::COMPLETED,
 | 
						|
        ];
 | 
						|
 | 
						|
        $query = ('
 | 
						|
            WITH deleted AS
 | 
						|
            (
 | 
						|
                DELETE FROM queue_account_operation
 | 
						|
                WHERE
 | 
						|
                    status = :status
 | 
						|
                    AND updated < :daysAgo
 | 
						|
                    RETURNING *
 | 
						|
            ) SELECT count(*) FROM deleted
 | 
						|
        ');
 | 
						|
 | 
						|
        $results = $this->execQuery($query, $params);
 | 
						|
 | 
						|
        return $results[0]['count'] ?? 0;
 | 
						|
    }
 | 
						|
}
 |