job() ->inject('message') ->action(function (Message $message) use (&$stats) { $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); foreach ($payload['metrics'] ?? [] as $metric) { $uniq = md5($metric['key']); if (!isset($stats[$uniq])) { $stats[$uniq] = [ 'projectInternalId' => $project->getInternalId(), 'database' => $project->getAttribute('database'), 'key' => $metric['key'], 'value' => $metric['value'] ]; continue; } $stats[$uniq]['value'] += $metric['value']; } }); $server ->workerStart() ->inject('register') ->inject('cache') ->inject('pools') ->action(function ($register, $cache, $pools) use ($periods, &$stats) { Timer::tick(30000, function () use ($register, $cache, $pools, $periods, &$stats) { $slice = array_slice($stats, 0, count($stats)); array_splice($stats, 0, count($stats)); foreach ($slice as $metric) { foreach ($periods as $period => $format) { $time = 'inf' === $period ? null : date($format, time()); $id = \md5("{$time}_{$period}_{$metric['key']}"); $adapter = new Database( $pools ->get($metric['database']) ->pop() ->getResource(), $cache ); $adapter->setNamespace('_' . $metric['projectInternalId']); try { $document = $adapter->getDocument('stats', $id); if ($document->isEmpty()) { //console::log("{$period}, {$time}, {$metric['key']}={$metric['value']}"); $adapter->createDocument('stats', new Document([ '$id' => $id, 'period' => $period, 'time' => $time, 'metric' => $metric['key'], 'value' => $metric['value'], 'type' => 0, 'region' => App::getEnv('_APP_REGION', 'default'), ])); } else { $value = $document->getAttribute('value') + $metric['value']; //console::info("{$document->getAttribute('period')}, {$document->getAttribute('time')}, {$document->getAttribute('metric')} = {$value}"); $adapter->updateDocument( 'stats', $document->getId(), $document->setAttribute('value', $document->getAttribute('value') + $metric['value']) ); } } catch (\Exception $e) { console::error($e->getMessage()); } finally { $pools->reclaim(); } } } }); }); $server->start();