feat(cloudron): add tirreno package artifacts
- Add CloudronStack/output/CloudronPackages-Artifacts/tirreno/ directory and its contents - Includes package manifest, Dockerfile, source code, documentation, and build artifacts - Add tirreno-1761840148.tar.gz as a build artifact - Add tirreno-cloudron-package-1761841304.tar.gz as the Cloudron package - Include all necessary files for the tirreno Cloudron package This adds the complete tirreno Cloudron package artifacts to the repository.
This commit is contained in:
@@ -0,0 +1,390 @@
|
||||
<?php
|
||||
|
||||
/**
|
||||
* Tirreno ~ Open source user analytics
|
||||
* Copyright (c) Tirreno Technologies Sàrl (https://www.tirreno.com)
|
||||
*
|
||||
* Licensed under GNU Affero General Public License version 3 of the or any later version.
|
||||
* For full copyright and license information, please see the LICENSE
|
||||
* Redistributions of files must retain the above copyright notice.
|
||||
*
|
||||
* @copyright Copyright (c) Tirreno Technologies Sàrl (https://www.tirreno.com)
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPL License
|
||||
* @link https://www.tirreno.com Tirreno(tm)
|
||||
*/
|
||||
|
||||
namespace Models\Queue;
|
||||
|
||||
class AccountOperationQueue extends \Models\BaseSql {
|
||||
public const DATETIME_FORMAT = 'Y-m-d H:i:s.u';
|
||||
|
||||
protected $DB_TABLE_NAME = 'queue_account_operation';
|
||||
|
||||
private \Type\QueueAccountOperationActionType $actionType;
|
||||
|
||||
public function __construct(\Type\QueueAccountOperationActionType $actionType) {
|
||||
$this->actionType = $actionType;
|
||||
|
||||
parent::__construct();
|
||||
}
|
||||
|
||||
public function add(int $accountId, int $key): void {
|
||||
$this->reset();
|
||||
$this->event_account = $accountId;
|
||||
$this->key = $key;
|
||||
$this->action = $this->actionType->value;
|
||||
$this->save();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array{accountId: int, key: int}[] $accounts
|
||||
*/
|
||||
public function addBatch(array $accounts): void {
|
||||
if (count($accounts) === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
$params = [':action' => strval($this->actionType->value)];
|
||||
$arrayPlaceholders = [];
|
||||
$prefix = '';
|
||||
foreach ($accounts as $idx => $record) {
|
||||
$prefix = ":{$idx}_";
|
||||
|
||||
$params[$prefix . 'idx'] = $idx;
|
||||
$params[$prefix . 'account_id'] = intval($record['accountId']);
|
||||
$params[$prefix . 'key'] = intval($record['key']);
|
||||
$arrayPlaceholders[] = "({$prefix}idx, {$prefix}account_id, {$prefix}key)";
|
||||
}
|
||||
|
||||
$strPlaceholders = \implode(', ', $arrayPlaceholders);
|
||||
|
||||
// update waiting records
|
||||
$query = (
|
||||
"UPDATE queue_account_operation
|
||||
SET
|
||||
updated = now()
|
||||
FROM (VALUES $strPlaceholders) AS v(idx, account_id, key)
|
||||
WHERE
|
||||
queue_account_operation.event_account = v.account_id::bigint AND
|
||||
queue_account_operation.key = v.key::bigint AND
|
||||
queue_account_operation.action = :action AND
|
||||
queue_account_operation.status = 'waiting'
|
||||
RETURNING v.idx"
|
||||
);
|
||||
|
||||
$results = $this->execQuery($query, $params);
|
||||
|
||||
$updatedIdxs = array_unique(array_column($results, 'idx'));
|
||||
$notUpdatedIdxs = array_keys(array_diff(array_keys($accounts), $updatedIdxs));
|
||||
|
||||
if (!count($notUpdatedIdxs)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$params = [':action' => strval($this->actionType->value)];
|
||||
$arrayPlaceholders = [];
|
||||
foreach ($notUpdatedIdxs as $idxToInsert) {
|
||||
$prefix = ":{$idxToInsert}_";
|
||||
$record = $accounts[$idxToInsert];
|
||||
|
||||
$params[$prefix . 'account_id'] = $record['accountId'];
|
||||
$params[$prefix . 'key'] = $record['key'];
|
||||
$arrayPlaceholders[] = "({$prefix}account_id, {$prefix}key, :action)";
|
||||
}
|
||||
|
||||
$strPlaceholders = \implode(', ', $arrayPlaceholders);
|
||||
|
||||
$query = "INSERT INTO queue_account_operation (event_account, key, action) VALUES {$strPlaceholders} RETURNING id";
|
||||
|
||||
$result = $this->execQuery($query, $params);
|
||||
|
||||
$msg = sprintf('Adding %s accounts to %s queue -- %s updated, %s inserted', count($accounts), strval($this->actionType->value), count($updatedIdxs), count($result));
|
||||
\Utils\Logger::log(null, $msg);
|
||||
}
|
||||
|
||||
public function addBatchIds(array $accountIds, int $key): void {
|
||||
$batchSize = \Utils\Variables::getAccountOperationQueueBatchSize();
|
||||
|
||||
$batch = [];
|
||||
$cnt = 0;
|
||||
|
||||
foreach ($accountIds as $id) {
|
||||
$batch[] = [
|
||||
'accountId' => $id,
|
||||
'key' => $key,
|
||||
];
|
||||
$cnt++;
|
||||
|
||||
if ($cnt >= $batchSize) {
|
||||
$this->addBatch($batch);
|
||||
$batch = [];
|
||||
$cnt = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if ($cnt) {
|
||||
$this->addBatch($batch);
|
||||
}
|
||||
}
|
||||
|
||||
public function isInQueueStatus(int $accountId, int $key): array {
|
||||
$this->reset();
|
||||
$this->load([
|
||||
'event_account = ? AND key = ? AND action = ? AND status != ?',
|
||||
$accountId, $key, $this->actionType->value, \Type\QueueAccountOperationStatusType::COMPLETED,
|
||||
]);
|
||||
|
||||
return $this->dry() ? [false, null] : [true, $this->status];
|
||||
}
|
||||
|
||||
public function isInQueue(int $accountId, int $key): bool {
|
||||
$this->reset();
|
||||
$this->load([
|
||||
'event_account = ? AND key = ? AND action = ? AND status != ?',
|
||||
$accountId, $key, $this->actionType->value, \Type\QueueAccountOperationStatusType::COMPLETED,
|
||||
]);
|
||||
|
||||
return $this->dry() ? false : true;
|
||||
}
|
||||
|
||||
public function isExecuting(): bool {
|
||||
$this->load([
|
||||
'action = ? AND status = ?',
|
||||
$this->actionType->value, \Type\QueueAccountOperationStatusType::EXECUTING,
|
||||
]);
|
||||
|
||||
return $this->dry() ? false : true;
|
||||
}
|
||||
|
||||
public function actionIsInQueueProcessing(int $key): bool {
|
||||
$this->reset();
|
||||
$this->load([
|
||||
'key = ? AND action = ? AND status != ? AND status != ?',
|
||||
$key,
|
||||
$this->actionType->value,
|
||||
\Type\QueueAccountOperationStatusType::COMPLETED,
|
||||
\Type\QueueAccountOperationStatusType::FAILED,
|
||||
]);
|
||||
|
||||
return $this->dry() ? false : true;
|
||||
}
|
||||
|
||||
public function getNextInQueue(): array|null {
|
||||
$this->reset();
|
||||
$this->creator = 'SELECT creator
|
||||
FROM dshb_api
|
||||
WHERE dshb_api.id = queue_account_operation.key';
|
||||
$this->load([
|
||||
'action = ? AND status = ?',
|
||||
$this->actionType->value, \Type\QueueAccountOperationStatusType::WAITING,
|
||||
], ['order' => 'created ASC']);
|
||||
|
||||
return $this->dry() ? null : $this->cast();
|
||||
}
|
||||
|
||||
public function getNextBatchInQueue(int $batchSize): array {
|
||||
$params = [
|
||||
':batchSize' => $batchSize,
|
||||
':action' => $this->actionType->value,
|
||||
':status' => \Type\QueueAccountOperationStatusType::WAITING,
|
||||
];
|
||||
|
||||
$query = ('
|
||||
SELECT
|
||||
queue_account_operation.*,
|
||||
dshb_api.creator
|
||||
FROM queue_account_operation
|
||||
JOIN dshb_api
|
||||
ON dshb_api.id = queue_account_operation.key
|
||||
WHERE
|
||||
action = :action
|
||||
AND status = :status
|
||||
ORDER BY id ASC
|
||||
LIMIT :batchSize
|
||||
');
|
||||
|
||||
return $this->execQuery($query, $params);
|
||||
}
|
||||
|
||||
public function getNextBatchKeysInQueue(int $batchSize): array {
|
||||
$params = [
|
||||
':batchSize' => $batchSize,
|
||||
':action' => $this->actionType->value,
|
||||
':status' => \Type\QueueAccountOperationStatusType::WAITING,
|
||||
];
|
||||
|
||||
$query = ('
|
||||
SELECT
|
||||
DISTINCT key
|
||||
FROM (
|
||||
SELECT
|
||||
queue_account_operation.id,
|
||||
queue_account_operation.key
|
||||
FROM queue_account_operation
|
||||
WHERE
|
||||
action = :action
|
||||
AND status = :status
|
||||
ORDER BY id ASC
|
||||
LIMIT :batchSize
|
||||
) AS t
|
||||
');
|
||||
|
||||
$results = $this->execQuery($query, $params);
|
||||
|
||||
return array_column($results, 'key');
|
||||
}
|
||||
|
||||
public function setWaiting(): void {
|
||||
if ($this->loaded()) {
|
||||
$now = new \DateTime();
|
||||
$this->updated = $now->format(self::DATETIME_FORMAT);
|
||||
$this->status = \Type\QueueAccountOperationStatusType::WAITING;
|
||||
$this->save();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int[] $ids
|
||||
*/
|
||||
public function setWaitingForBatch(array $ids): void {
|
||||
$this->setStatusForBatch(
|
||||
$ids,
|
||||
new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::WAITING),
|
||||
);
|
||||
}
|
||||
|
||||
public function setExecuting(): void {
|
||||
if ($this->loaded()) {
|
||||
$now = new \DateTime();
|
||||
$this->updated = $now->format(self::DATETIME_FORMAT);
|
||||
$this->status = \Type\QueueAccountOperationStatusType::EXECUTING;
|
||||
$this->save();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int[] $ids
|
||||
*/
|
||||
public function setExecutingForBatch(array $ids): void {
|
||||
$this->setStatusForBatch(
|
||||
$ids,
|
||||
new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::EXECUTING),
|
||||
);
|
||||
}
|
||||
|
||||
public function setCompleted(): void {
|
||||
if ($this->loaded()) {
|
||||
$now = new \DateTime();
|
||||
$this->updated = $now->format(self::DATETIME_FORMAT);
|
||||
$this->status = \Type\QueueAccountOperationStatusType::COMPLETED;
|
||||
$this->save();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int[] $ids
|
||||
*/
|
||||
public function setCompletedForBatch(array $ids): void {
|
||||
$this->setStatusForBatch(
|
||||
$ids,
|
||||
new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::COMPLETED),
|
||||
);
|
||||
}
|
||||
|
||||
public function setFailed(): void {
|
||||
if ($this->loaded()) {
|
||||
$now = new \DateTime();
|
||||
$this->updated = $now->format(self::DATETIME_FORMAT);
|
||||
$this->status = \Type\QueueAccountOperationStatusType::FAILED;
|
||||
$this->save();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int[] $ids
|
||||
*/
|
||||
public function setFailedForBatch(array $ids): void {
|
||||
$this->setStatusForBatch(
|
||||
$ids,
|
||||
new \Type\QueueAccountOperationStatusType(\Type\QueueAccountOperationStatusType::FAILED),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int[] $ids
|
||||
*/
|
||||
private function setStatusForBatch(array $ids, \Type\QueueAccountOperationStatusType $status): void {
|
||||
if (!count($ids)) {
|
||||
return;
|
||||
}
|
||||
|
||||
[$params, $placeHolders] = $this->getArrayPlaceholders($ids);
|
||||
|
||||
$params[':status'] = $status->value;
|
||||
$params[':updated'] = (new \DateTime())->format(self::DATETIME_FORMAT);
|
||||
|
||||
$query = ("
|
||||
UPDATE queue_account_operation
|
||||
SET
|
||||
status = :status,
|
||||
updated = :updated
|
||||
WHERE
|
||||
id IN ({$placeHolders})
|
||||
");
|
||||
|
||||
$this->execQuery($query, $params);
|
||||
}
|
||||
|
||||
public function removeFromQueue(): void {
|
||||
if ($this->loaded()) {
|
||||
$this->erase();
|
||||
}
|
||||
}
|
||||
|
||||
public function unclog(): bool {
|
||||
if ($this->loaded() && $this->status === \Type\QueueAccountOperationStatusType::EXECUTING) {
|
||||
$updatedDateTime = new \DateTime($this->updated);
|
||||
$currentDateTime = new \DateTime();
|
||||
|
||||
$differenceInSeconds = $currentDateTime->getTimestamp() - $updatedDateTime->getTimestamp();
|
||||
$totalMinutes = (int) floor($differenceInSeconds / 60);
|
||||
|
||||
if ($totalMinutes < \Utils\Constants::get('ACCOUNT_OPERATION_QUEUE_AUTO_UNCLOG_AFTER_MINUTES')) {
|
||||
return false; // Time not elapsed, no need to unclog (yet).
|
||||
}
|
||||
|
||||
$this->setFailed();
|
||||
|
||||
$msg = sprintf('Queue failed on unclog (now - updated > 2 hours) on account %d minutes diff %d.', $this->event_account, $totalMinutes);
|
||||
\Utils\Logger::log(null, $msg);
|
||||
|
||||
return true; // Unclogged queue.
|
||||
}
|
||||
|
||||
return false; // No need to unclog.
|
||||
}
|
||||
|
||||
public function clearCompleted(\DateTime $clearBefore): int {
|
||||
$this->reset();
|
||||
|
||||
$params = [
|
||||
':daysAgo' => $clearBefore->format(self::DATETIME_FORMAT),
|
||||
':status' => \Type\QueueAccountOperationStatusType::COMPLETED,
|
||||
];
|
||||
|
||||
$query = ('
|
||||
WITH deleted AS
|
||||
(
|
||||
DELETE FROM queue_account_operation
|
||||
WHERE
|
||||
status = :status
|
||||
AND updated < :daysAgo
|
||||
RETURNING *
|
||||
) SELECT count(*) FROM deleted
|
||||
');
|
||||
|
||||
$results = $this->execQuery($query, $params);
|
||||
|
||||
return $results[0]['count'] ?? 0;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user