diff --git a/lib/metrics.ts b/lib/metrics.ts new file mode 100644 index 00000000..380d78d7 --- /dev/null +++ b/lib/metrics.ts @@ -0,0 +1,39 @@ +import * as client from 'prom-client'; +import os from 'os'; +import { nanoid } from 'nanoid'; + +const register = new client.Registry(); + +client.collectDefaultMetrics({ register }); + +export { register, client }; + +/** + * Start periodic push to pushgateway + * + * @param workerName - name of the worker for grouping + */ +export function startMetricsPushing(workerName: string): void { + const url = process.env.PROMETHEUS_PUSHGATEWAY_URL; + const interval = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL || '10000'); + + if (!url) { + return; + } + + const hostname = os.hostname(); + const ID_SIZE = 5; + const id = nanoid(ID_SIZE); + + const gateway = new client.Pushgateway(url, [], register); + + console.log(`Start pushing metrics to ${url} every ${interval}ms (host: ${hostname}, id: ${id})`); + + setInterval(() => { + gateway.pushAdd({ jobName: 'workers', groupings: { worker: workerName, host: hostname, id } }, (err) => { + if (err) { + console.error('Metrics push error:', err); + } + }); + }, interval); +} diff --git a/runner.ts b/runner.ts index 443beea8..9147b54b 100644 --- a/runner.ts +++ b/runner.ts @@ -9,6 +9,7 @@ import * as utils from './lib/utils'; import { Worker } from './lib/worker'; import HawkCatcher from '@hawk.so/nodejs'; import * as dotenv from 'dotenv'; +import { startMetricsPushing } from './lib/metrics'; dotenv.config(); @@ -57,19 +58,17 @@ class WorkerRunner { .then((workerConstructors) => { this.constructWorkers(workerConstructors); }) - // .then(() => { - // try { - // this.startMetrics(); - // } catch (e) { - // HawkCatcher.send(e); - // console.error(`Metrics not started: ${e}`); - // } - // - // return Promise.resolve(); - // }) .then(() => { return this.startWorkers(); }) + .then(() => { + try { + this.startMetrics(); + } catch (e) { + HawkCatcher.send(e); + console.error(`Metrics not started: ${e}`); + } + }) .then(() => { this.observeProcess(); }) @@ -82,67 +81,15 @@ class WorkerRunner { /** * Run metrics exporter */ - // private startMetrics(): void { - // if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { - // return; - // } - // - // const PUSH_INTERVAL = parseInt(process.env.PROMETHEUS_PUSHGATEWAY_INTERVAL); - // - // if (isNaN(PUSH_INTERVAL)) { - // throw new Error('PROMETHEUS_PUSHGATEWAY_INTERVAL is invalid or not set'); - // } - // - // const collectDefaultMetrics = promClient.collectDefaultMetrics; - // const Registry = promClient.Registry; - // - // const register = new Registry(); - // const startGcStats = gcStats(register); - // - // const hostname = os.hostname(); - // - // const ID_SIZE = 5; - // const id = nanoid(ID_SIZE); - // - // // eslint-disable-next-line node/no-deprecated-api - // const instance = url.parse(process.env.PROMETHEUS_PUSHGATEWAY_URL).host; - // - // // Initialize metrics for workers - // this.workers.forEach((worker) => { - // // worker.initMetrics(); - // worker.getMetrics().forEach((metric: promClient.Counter) => register.registerMetric(metric)); - // }); - // - // collectDefaultMetrics({ register }); - // startGcStats(); - // - // this.gateway = new promClient.Pushgateway(process.env.PROMETHEUS_PUSHGATEWAY_URL, null, register); - // - // console.log(`Start pushing metrics to ${process.env.PROMETHEUS_PUSHGATEWAY_URL}`); - // - // // Pushing metrics to the pushgateway every PUSH_INTERVAL - // this.pushIntervalNumber = setInterval(() => { - // this.workers.forEach((worker) => { - // if (!this.gateway || !instance) { - // return; - // } - // // Use pushAdd not to overwrite previous metrics - // this.gateway.pushAdd({ - // jobName: 'workers', - // groupings: { - // worker: worker.type.replace('/', '_'), - // host: hostname, - // id, - // }, - // }, (err?: Error) => { - // if (err) { - // HawkCatcher.send(err); - // console.log(`Error of pushing metrics to gateway: ${err}`); - // } - // }); - // }); - // }, PUSH_INTERVAL); - // } + private startMetrics(): void { + if (!process.env.PROMETHEUS_PUSHGATEWAY_URL) { + return; + } + + this.workers.forEach((worker) => { + startMetricsPushing(worker.type.replace('/', '_')); + }); + } /** * Dynamically loads workers through the yarn workspaces diff --git a/workers/grouper/src/index.ts b/workers/grouper/src/index.ts index 73f16fc7..0eb756db 100644 --- a/workers/grouper/src/index.ts +++ b/workers/grouper/src/index.ts @@ -26,6 +26,7 @@ import { rightTrim } from '../../../lib/utils/string'; import { hasValue } from '../../../lib/utils/hasValue'; /* eslint-disable-next-line no-unused-vars */ import { memoize } from '../../../lib/memoize'; +import { register, client } from '../../../lib/metrics'; /** * eslint does not count decorators as a variable usage @@ -72,6 +73,55 @@ export default class GrouperWorker extends Worker { */ private redis = new RedisHelper(); + /** + * Prometheus metrics + */ + private metricsEventsTotal = new client.Counter({ + name: 'hawk_grouper_events_total', + help: 'Total number of events processed by grouper', + labelNames: ['type'], + registers: [register], + }); + + private metricsHandleDuration = new client.Histogram({ + name: 'hawk_grouper_handle_duration_seconds', + help: 'Duration of handle() call in seconds', + registers: [register], + }); + + private metricsErrorsTotal = new client.Counter({ + name: 'hawk_grouper_errors_total', + help: 'Total number of errors during event processing', + registers: [register], + }); + + private metricsMongoDuration = new client.Histogram({ + name: 'hawk_grouper_mongo_duration_seconds', + help: 'Duration of MongoDB operations in seconds', + labelNames: ['operation'], + registers: [register], + }); + + private metricsDeltaSize = new client.Histogram({ + name: 'hawk_grouper_delta_size_bytes', + help: 'Size of computed repetition delta in bytes', + buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000], + registers: [register], + }); + + private metricsPayloadSize = new client.Histogram({ + name: 'hawk_grouper_payload_size_bytes', + help: 'Size of incoming event payload in bytes', + buckets: [100, 500, 1000, 5000, 10000, 50000, 100000, 500000], + registers: [register], + }); + + private metricsDuplicateRetries = new client.Counter({ + name: 'hawk_grouper_duplicate_retries_total', + help: 'Number of retries due to duplicate key errors', + registers: [register], + }); + /** * Start consuming messages */ @@ -105,6 +155,30 @@ export default class GrouperWorker extends Worker { * @param task - event to handle */ public async handle(task: GroupWorkerTask): Promise { + const endTimer = this.metricsHandleDuration.startTimer(); + + try { + await this.handleInternal(task); + endTimer(); + } catch (error) { + endTimer(); + this.metricsErrorsTotal.inc(); + throw error; + } + } + + /** + * Internal task handling function + * + * @param task - event to handle + */ + private async handleInternal(task: GroupWorkerTask): Promise { + const taskPayloadSize = Buffer.byteLength(JSON.stringify(task.payload)); + + this.metricsPayloadSize.observe(taskPayloadSize); + + this.logger.info(`[handle] project=${task.projectId} catcher=${task.catcherType} title="${task.payload.title}" payloadSize=${taskPayloadSize}b backtraceFrames=${task.payload.backtrace?.length ?? 0}`); + let uniqueEventHash = await this.getUniqueEventHash(task); // FIX RELEASE TYPE @@ -147,6 +221,11 @@ export default class GrouperWorker extends Worker { */ const isFirstOccurrence = !existedEvent && !similarEvent; + /** + * Increment metrics counter + */ + this.metricsEventsTotal.inc({ type: isFirstOccurrence ? 'new' : 'repeated' }); + let repetitionId = null; let incrementDailyAffectedUsers = false; @@ -165,6 +244,8 @@ export default class GrouperWorker extends Worker { try { const incrementAffectedUsers = !!task.payload.user; + this.logger.info(`[saveEvent] new event, payloadSize=${taskPayloadSize}b`); + /** * Insert new event */ @@ -194,6 +275,8 @@ export default class GrouperWorker extends Worker { * and we need to process this event as repetition */ if (e.code?.toString() === DB_DUPLICATE_KEY_ERROR) { + this.metricsDuplicateRetries.inc(); + this.logger.info(`[saveEvent] duplicate key, retrying as repetition`); await this.handle(task); return; @@ -220,6 +303,10 @@ export default class GrouperWorker extends Worker { let delta: RepetitionDelta; + const existedPayloadSize = Buffer.byteLength(JSON.stringify(existedEvent.payload)); + + this.logger.info(`[computeDelta] existedPayloadSize=${existedPayloadSize}b taskPayloadSize=${taskPayloadSize}b`); + try { /** * Calculate delta between original event and repetition @@ -230,9 +317,16 @@ export default class GrouperWorker extends Worker { throw new DiffCalculationError(e, existedEvent.payload, task.payload); } + const deltaStr = JSON.stringify(delta); + const deltaSize = deltaStr != null ? Buffer.byteLength(deltaStr) : 0; + + this.metricsDeltaSize.observe(deltaSize); + + this.logger.info(`[computeDelta] deltaSize=${deltaSize}b`); + const newRepetition = { groupHash: uniqueEventHash, - delta: JSON.stringify(delta), + delta: deltaStr, timestamp: task.timestamp, } as RepetitionDBScheme; @@ -250,6 +344,10 @@ export default class GrouperWorker extends Worker { incrementDailyAffectedUsers ); + const mem = process.memoryUsage(); + + this.logger.info(`[handle] done, heapUsed=${Math.round(mem.heapUsed / 1024 / 1024)}MB heapTotal=${Math.round(mem.heapTotal / 1024 / 1024)}MB rss=${Math.round(mem.rss / 1024 / 1024)}MB`); + /** * Add task for NotifierWorker only if event is not ignored */ @@ -348,7 +446,9 @@ export default class GrouperWorker extends Worker { try { const originalEvent = await this.findFirstEventByPattern(matchingPattern.pattern, projectId); - this.logger.info(`original event for pattern: ${JSON.stringify(originalEvent)}`); + const originalEventSize = Buffer.byteLength(JSON.stringify(originalEvent)); + + this.logger.info(`[findSimilarEvent] found by pattern, originalEventSize=${originalEventSize}b`); if (originalEvent) { return originalEvent; @@ -518,7 +618,9 @@ export default class GrouperWorker extends Worker { const eventCacheKey = await this.getEventCacheKey(projectId, groupHash); return this.cache.get(eventCacheKey, async () => { - return this.eventsDb.getConnection() + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'getEvent' }); + + const result = await this.eventsDb.getConnection() .collection(`events:${projectId}`) .findOne({ groupHash, @@ -526,6 +628,10 @@ export default class GrouperWorker extends Worker { .catch((err) => { throw new DatabaseReadWriteError(err); }); + + endTimer(); + + return result; }); } @@ -553,12 +659,18 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveEvent' }); + const collection = this.eventsDb.getConnection().collection(`events:${projectId}`); encodeUnsafeFields(groupedEventData); - return (await collection + const result = (await collection .insertOne(groupedEventData)).insertedId as mongodb.ObjectID; + + endTimer(); + + return result; } /** @@ -572,13 +684,20 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveRepetition: Project ID is invalid or missing'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveRepetition' }); + try { const collection = this.eventsDb.getConnection().collection(`repetitions:${projectId}`); encodeUnsafeFields(repetition); - return (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + const result = (await collection.insertOne(repetition)).insertedId as mongodb.ObjectID; + + endTimer(); + + return result; } catch (err) { + endTimer(); throw new DatabaseReadWriteError(err, { repetition: repetition as unknown as Record, projectId, @@ -598,6 +717,8 @@ export default class GrouperWorker extends Worker { throw new ValidationError('Controller.saveEvent: Project ID is invalid or missed'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'incrementCounter' }); + try { const updateQuery = incrementAffected ? { @@ -612,10 +733,15 @@ export default class GrouperWorker extends Worker { }, }; - return (await this.eventsDb.getConnection() + const result = (await this.eventsDb.getConnection() .collection(`events:${projectId}`) .updateOne(query, updateQuery)).modifiedCount; + + endTimer(); + + return result; } catch (err) { + endTimer(); throw new DatabaseReadWriteError(err); } } @@ -641,6 +767,8 @@ export default class GrouperWorker extends Worker { throw new ValidationError('GrouperWorker.saveDailyEvents: Project ID is invalid or missed'); } + const endTimer = this.metricsMongoDuration.startTimer({ operation: 'saveDailyEvents' }); + try { const midnight = this.getMidnightByEventTimestamp(eventTimestamp); @@ -664,7 +792,10 @@ export default class GrouperWorker extends Worker { }, }, { upsert: true }); + + endTimer(); } catch (err) { + endTimer(); throw new DatabaseReadWriteError(err); } }