diff --git a/lib/metrics.ts b/lib/metrics.ts new file mode 100644 index 00000000..380d78d7 --- /dev/null +++ b/lib/metrics.ts @@ -0,0 +1,39 @@ +import * as client from 'prom-client'; +import os from 'os'; +import { nanoid } from 'nanoid'; + +const register = new client.Registry(); + +client.collectDefaultMetrics({ register }); + +export { register, client }; + +/** + * Start periodic push to pushgateway + * + * @param workerName - name of the worker for grouping + */ +export function startMetricsPushing(workerName: string): void { + const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; + const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000'); + + if (!url) { + return; + } + + const hostname = os.hostname(); + const ID_SIZE = 5; + const id = nanoid(ID_SIZE); + + const gateway = new client.Pushgateway(url, [], register); + + console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`); + + setInterval(() => { + gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => { + if (err) { + console.error('Metrics push error:', err); + } + }); + }, interval); +} diff --git a/runner.ts b/runner.ts index 443beea8..9147b54b 100644 --- a/runner.ts +++ b/runner.ts @@ -9,6 +9,7 @@ import * as utils from './lib/utils'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; +import { startMetricsPushing } from './lib/metrics'; dotenv.config(); @@ -57,19 +58,17 @@ class WorkerRunner { .then((workerConstructors) => { this.constructWorkers(workerConstructors); }) - // .then(() => { - // try { - // this.startMetrics(); - // } catch (e) { - // HawkCatcher.send(e); - // console.error(`Metrics not started: ${e}`); - // } - // - // return Promise.resolve(); - // }) .then(() => { return this.startWorkers(); }) + .then(() => { + try { + this.startMetrics(); + } catch (e) { + HawkCatcher.send(e); + console.error(`Metrics not started: ${e}`); + } + }) .then(() => { this.observeProcess(); }) @@ -82,67 +81,15 @@ class WorkerRunner { /** * Run metrics exporter */ - // private startMetrics(): void { - // if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { - // return; - // } - // - // const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL); - // - // if (isNaN(PUSH_INTERVAL)) { - // throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set'); - // } - // - // const collectDefaultMetrics = promClient.collectDefaultMetrics; - // const Registry = promClient.Registry; - // - // const register = new Registry(); - // const startGcStats = gcStats(register); - // - // const hostname = os.hostname(); - // - // const ID_SIZE = 5; - // const id = nanoid(ID_SIZE); - // - // // eslint-disable-next-line node/no-deprecated-api - // const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host; - // - // // Initialize metrics for workers - // this.workers.forEach((worker) => { - // // worker.initMetrics(); - // worker.getMetrics().forEach((metric: promClient.Counter) => register.registerMetric(metric)); - // }); - // - // collectDefaultMetrics({ register }); - // startGcStats(); - // - // this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register); - // - // console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`); - // - // // Pushing metrics to the pushgateway every PUSH_INTERVAL - // this.pushIntervalNumber = setInterval(() => { - // this.workers.forEach((worker) => { - // if (!this.gateway || !instance) { - // return; - // } - // // Use pushAdd not to overwrite previous metrics - // this.gateway.pushAdd({ - // jobName: 'workers', - // groupings: { - // worker: worker.type.replace('/', '_'), - // host: hostname, - // id, - // }, - // }, (err?: Error) => { - // if (err) { - // HawkCatcher.send(err); - // console.log(`Error of pushing metrics to gateway: ${err}`); - // } - // }); - // }); - // }, PUSH_INTERVAL); - // } + private startMetrics(): void { + if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + return; + } + + this.workers.forEach((worker) => { + startMetricsPushing(worker.type.replace('/', '_')); + }); + } /** * Dynamically loads workers through the yarn workspaces diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 73f16fc7..03fdbd27 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import { register, client } from '../../../lib/metrics'; /** * eslint does not count decorators as a variable usage @@ -72,6 +73,28 @@ export default class GrouperWorker extends Worker { */ private redis = new RedisHelper(); + /** + * Prometheus metrics + */ + private metricsEventsTotal = new client.Counter({ + name: 'hawk_grouper_events_total', + help: 'Total number of events processed by grouper', + labelNames: ['type'], + registers: [register], + }); + + private metricsEventDuration = new client.Histogram({ + name: 'hawk_grouper_event_duration_seconds', + help: 'Duration of event processing in seconds', + registers: [register], + }); + + private metricsErrorsTotal = new client.Counter({ + name: 'hawk_grouper_errors_total', + help: 'Total number of errors during event processing', + registers: [register], + }); + /** * Start consuming messages */ @@ -105,6 +128,24 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { + const endTimer = this.metricsEventDuration.startTimer(); + + try { + await this.handleInternal(task); + endTimer(); + } catch (error) { + endTimer(); + this.metricsErrorsTotal.inc(); + throw error; + } + } + + /** + * Internal task handling function + * + * @param task - event to handle + */ + private async handleInternal(task: GroupWorkerTask): Promise { let uniqueEventHash = await this.getUniqueEventHash(task); // FIX RELEASE TYPE @@ -147,6 +188,11 @@ export default class GrouperWorker extends Worker { */ const isFirstOccurrence = !existedEvent && !similarEvent; + /** + * Increment metrics counter + */ + this.metricsEventsTotal.inc({ type: isFirstOccurrence ? 'new' : 'repeated' }); + let repetitionId = null; let incrementDailyAffectedUsers = false;