diff --git a/lambdas/api-handler/src/handlers/amendment-event-transformer.ts b/lambdas/api-handler/src/handlers/amendment-event-transformer.ts index 2ae23333..1a390232 100644 --- a/lambdas/api-handler/src/handlers/amendment-event-transformer.ts +++ b/lambdas/api-handler/src/handlers/amendment-event-transformer.ts @@ -2,11 +2,14 @@ import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; import { PublishCommand } from "@aws-sdk/client-sns"; import { LetterEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events"; import { mapLetterToCloudEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-mapper"; +import { Unit } from "aws-embedded-metrics"; +import pino from "pino"; import { UpdateLetterCommand, UpdateLetterCommandSchema, } from "../contracts/letters"; import { Deps } from "../config/deps"; +import { MetricEntry, MetricStatus, buildEMFObject } from "../utils/metrics"; export default function createTransformAmendmentEventHandler( deps: Deps, @@ -39,6 +42,11 @@ export default function createTransformAmendmentEventHandler( messageId: message.messageId, correlationId: message.messageAttributes.CorrelationId.stringValue, }); + emitSuccessMetrics( + updateLetterCommand.supplierId, + updateLetterCommand.status, + deps.logger, + ); } catch (error) { deps.logger.error({ description: "Error processing letter status update", @@ -52,7 +60,7 @@ export default function createTransformAmendmentEventHandler( }); await Promise.all(tasks); - + emitFailedItems(batchItemFailures, deps.logger); return { batchItemFailures }; }; } @@ -66,3 +74,43 @@ function buildSnsCommand( Message: JSON.stringify(letterEvent), }); } + +function emitSuccessMetrics( + supplierId: string, + status: string, + logger: pino.Logger, +) { + const dimensions: Record = { + supplier: supplierId, + status, + }; + const metric: MetricEntry = { + key: MetricStatus.Success, + value: 1, + unit: Unit.Count, + }; + const emf = buildEMFObject("amendment-event-transformer", dimensions, metric); + logger.info(emf); +} + +function emitFailedItems( + batchFailures: SQSBatchItemFailure[], + logger: pino.Logger, +) { + for (const item of batchFailures) { + const dimensions: Record = { + identifier: item.itemIdentifier, + }; + const metric: MetricEntry = { + key: MetricStatus.Failure, + value: 1, + unit: Unit.Count, + }; + const emf = buildEMFObject( + "amendment-event-transformer", + dimensions, + metric, + ); + logger.info(emf); + } +} diff --git a/lambdas/api-handler/src/handlers/get-letters.ts b/lambdas/api-handler/src/handlers/get-letters.ts index 6d9a3e67..8bce6dee 100644 --- a/lambdas/api-handler/src/handlers/get-letters.ts +++ b/lambdas/api-handler/src/handlers/get-letters.ts @@ -14,7 +14,6 @@ import { mapToGetLettersResponse } from "../mappers/letter-mapper"; import type { Deps } from "../config/deps"; import { MetricStatus, emitForSingleSupplier } from "../utils/metrics"; -// List letters Handlers // The endpoint should only return pending letters for now const status = "PENDING"; diff --git a/lambdas/api-handler/src/handlers/patch-letter.ts b/lambdas/api-handler/src/handlers/patch-letter.ts index fe906d14..fc71b67d 100644 --- a/lambdas/api-handler/src/handlers/patch-letter.ts +++ b/lambdas/api-handler/src/handlers/patch-letter.ts @@ -56,6 +56,7 @@ export default function createPatchLetterHandler( try { patchLetterRequest = PatchLetterRequestSchema.parse(JSON.parse(body)); } catch (error) { + emitErrorMetric(metrics, supplierId); const typedError = error instanceof Error ? new ValidationError(ApiErrorDetail.InvalidRequestBody, { @@ -79,6 +80,7 @@ export default function createPatchLetterHandler( ); if (updateLetterCommand.id !== letterId) { + emitErrorMetric(metrics, supplierId); throw new ValidationError( ApiErrorDetail.InvalidRequestLetterIdsMismatch, ); @@ -100,12 +102,16 @@ export default function createPatchLetterHandler( body: "", }; } catch (error) { - metrics.putDimensions({ - supplier: supplierId, - }); - metrics.putMetric(MetricStatus.Success, 1, Unit.Count); + emitErrorMetric(metrics, supplierId); return processError(error, commonIds.value.correlationId, deps.logger); } }; }); } + +function emitErrorMetric(metrics: MetricsLogger, supplierId: string) { + metrics.putDimensions({ + supplier: supplierId, + }); + metrics.putMetric(MetricStatus.Failure, 1, Unit.Count); +} diff --git a/lambdas/api-handler/src/handlers/post-letters.ts b/lambdas/api-handler/src/handlers/post-letters.ts index a84c200d..665a9d45 100644 --- a/lambdas/api-handler/src/handlers/post-letters.ts +++ b/lambdas/api-handler/src/handlers/post-letters.ts @@ -1,5 +1,6 @@ import { APIGatewayProxyHandler } from "aws-lambda"; -import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics"; +import { Unit } from "aws-embedded-metrics"; +import pino from "pino"; import type { Deps } from "../config/deps"; import { ApiErrorDetail } from "../contracts/errors"; import { @@ -13,7 +14,7 @@ import { mapToUpdateCommands } from "../mappers/letter-mapper"; import { enqueueLetterUpdateRequests } from "../services/letter-operations"; import { extractCommonIds } from "../utils/common-ids"; import { assertNotEmpty, requireEnvVar } from "../utils/validation"; -import { MetricStatus } from "../utils/metrics"; +import { MetricEntry, MetricStatus, buildEMFObject } from "../utils/metrics"; function duplicateIdsExist(postLettersRequest: PostLettersRequest) { const ids = postLettersRequest.data.map((item) => item.id); @@ -23,17 +24,23 @@ function duplicateIdsExist(postLettersRequest: PostLettersRequest) { /** * emits metrics of successful letter updates, including the supplier and grouped by status */ -function emitMetics( - metrics: MetricsLogger, +function emitSuccessMetrics( supplierId: string, statusesMapping: Map, + logger: pino.Logger, ) { for (const [status, count] of statusesMapping) { - metrics.putDimensions({ + const dimensions: Record = { supplier: supplierId, - eventType: status, - }); - metrics.putMetric(MetricStatus.Success, count, Unit.Count); + status, + }; + const metric: MetricEntry = { + key: MetricStatus.Success, + value: count, + unit: Unit.Count, + }; + const emf = buildEMFObject("postLetters", dimensions, metric); + logger.info(emf); } } @@ -48,92 +55,97 @@ function populateStatusesMap(updateLetterCommands: UpdateLetterCommand[]) { export default function createPostLettersHandler( deps: Deps, ): APIGatewayProxyHandler { - return metricScope((metrics: MetricsLogger) => { - return async (event) => { - const commonIds = extractCommonIds( - event.headers, - event.requestContext, - deps, - ); + return async (event) => { + const commonIds = extractCommonIds( + event.headers, + event.requestContext, + deps, + ); - if (!commonIds.ok) { - return processError( - commonIds.error, - commonIds.correlationId, - deps.logger, - ); - } + if (!commonIds.ok) { + return processError( + commonIds.error, + commonIds.correlationId, + deps.logger, + ); + } - const maxUpdateItems = requireEnvVar(deps.env, "MAX_LIMIT"); - requireEnvVar(deps.env, "QUEUE_URL"); + const maxUpdateItems = requireEnvVar(deps.env, "MAX_LIMIT"); + requireEnvVar(deps.env, "QUEUE_URL"); - const { supplierId } = commonIds.value; - metrics.setNamespace( - process.env.AWS_LAMBDA_FUNCTION_NAME || "postLetters", + const { supplierId } = commonIds.value; + try { + const body = assertNotEmpty( + event.body, + new ValidationError(ApiErrorDetail.InvalidRequestMissingBody), ); + + let postLettersRequest: PostLettersRequest; + try { - const body = assertNotEmpty( - event.body, - new ValidationError(ApiErrorDetail.InvalidRequestMissingBody), - ); + postLettersRequest = PostLettersRequestSchema.parse(JSON.parse(body)); + } catch (error) { + const typedError = + error instanceof Error + ? new ValidationError(ApiErrorDetail.InvalidRequestBody, { + cause: error, + }) + : error; + throw typedError; + } - let postLettersRequest: PostLettersRequest; - - try { - postLettersRequest = PostLettersRequestSchema.parse(JSON.parse(body)); - } catch (error) { - const typedError = - error instanceof Error - ? new ValidationError(ApiErrorDetail.InvalidRequestBody, { - cause: error, - }) - : error; - throw typedError; - } - - deps.logger.info({ - description: "Received post letters request", - supplierId: commonIds.value.supplierId, - letterIds: postLettersRequest.data.map((letter) => letter.id), - correlationId: commonIds.value.correlationId, - }); - - if (postLettersRequest.data.length > maxUpdateItems) { - throw new ValidationError( - ApiErrorDetail.InvalidRequestLettersToUpdate, - { args: [maxUpdateItems] }, - ); - } - - if (duplicateIdsExist(postLettersRequest)) { - throw new ValidationError( - ApiErrorDetail.InvalidRequestDuplicateLetterId, - ); - } - - const updateLetterCommands: UpdateLetterCommand[] = mapToUpdateCommands( - postLettersRequest, - supplierId, - ); - const statusesMapping = populateStatusesMap(updateLetterCommands); - await enqueueLetterUpdateRequests( - updateLetterCommands, - commonIds.value.correlationId, - deps, + deps.logger.info({ + description: "Received post letters request", + supplierId: commonIds.value.supplierId, + letterIds: postLettersRequest.data.map((letter) => letter.id), + correlationId: commonIds.value.correlationId, + }); + + if (postLettersRequest.data.length > maxUpdateItems) { + throw new ValidationError( + ApiErrorDetail.InvalidRequestLettersToUpdate, + { args: [maxUpdateItems] }, ); + } - emitMetics(metrics, supplierId, statusesMapping); - return { - statusCode: 202, - body: "", - }; - } catch (error) { - metrics.putDimensions({ - supplier: supplierId, - }); - metrics.putMetric(MetricStatus.Failure, 1, Unit.Count); - return processError(error, commonIds.value.correlationId, deps.logger); + if (duplicateIdsExist(postLettersRequest)) { + throw new ValidationError( + ApiErrorDetail.InvalidRequestDuplicateLetterId, + ); } - }; - }); + + const updateLetterCommands: UpdateLetterCommand[] = mapToUpdateCommands( + postLettersRequest, + supplierId, + ); + const statusesMapping = populateStatusesMap(updateLetterCommands); + await enqueueLetterUpdateRequests( + updateLetterCommands, + commonIds.value.correlationId, + deps, + ); + + emitSuccessMetrics(supplierId, statusesMapping, deps.logger); + return { + statusCode: 202, + body: "", + }; + } catch (error) { + // error metrics + emitErrorMetrics(supplierId, deps.logger); + + return processError(error, commonIds.value.correlationId, deps.logger); + } + }; +} + +function emitErrorMetrics(supplierId: string, logger: pino.Logger) { + const dimensions: Record = { supplier: supplierId }; + const metric: MetricEntry = { + key: MetricStatus.Failure, + value: 1, + unit: Unit.Count, + }; + const emf = buildEMFObject("postLetters", dimensions, metric); + logger.info(emf); } diff --git a/lambdas/api-handler/src/handlers/post-mi.ts b/lambdas/api-handler/src/handlers/post-mi.ts index 0d17512b..82090855 100644 --- a/lambdas/api-handler/src/handlers/post-mi.ts +++ b/lambdas/api-handler/src/handlers/post-mi.ts @@ -1,5 +1,6 @@ import { APIGatewayProxyHandler } from "aws-lambda"; -import { MetricsLogger, metricScope } from "aws-embedded-metrics"; +import { Unit } from "aws-embedded-metrics"; +import pino from "pino"; import postMIOperation from "../services/mi-operations"; import { ApiErrorDetail } from "../contracts/errors"; import ValidationError from "../errors/validation-error"; @@ -9,92 +10,95 @@ import { extractCommonIds } from "../utils/common-ids"; import { PostMIRequest, PostMIRequestSchema } from "../contracts/mi"; import { mapToMI } from "../mappers/mi-mapper"; import { Deps } from "../config/deps"; -import { MetricStatus, emitForSingleSupplier } from "../utils/metrics"; +import { MetricEntry, MetricStatus, buildEMFObject } from "../utils/metrics"; export default function createPostMIHandler( deps: Deps, ): APIGatewayProxyHandler { - return metricScope((metrics: MetricsLogger) => { - return async (event) => { - const commonIds = extractCommonIds( - event.headers, - event.requestContext, - deps, + return async (event) => { + const commonIds = extractCommonIds( + event.headers, + event.requestContext, + deps, + ); + + if (!commonIds.ok) { + return processError( + commonIds.error, + commonIds.correlationId, + deps.logger, ); + } - if (!commonIds.ok) { - return processError( - commonIds.error, - commonIds.correlationId, - deps.logger, - ); - } + const { supplierId } = commonIds.value; + try { + const body = assertNotEmpty( + event.body, + new ValidationError(ApiErrorDetail.InvalidRequestMissingBody), + ); + + let postMIRequest: PostMIRequest; - const { supplierId } = commonIds.value; try { - const body = assertNotEmpty( - event.body, - new ValidationError(ApiErrorDetail.InvalidRequestMissingBody), - ); + postMIRequest = PostMIRequestSchema.parse(JSON.parse(body)); + } catch (error) { + emitErrorMetric(supplierId, deps.logger); + const typedError = + error instanceof Error + ? new ValidationError(ApiErrorDetail.InvalidRequestBody, { + cause: error, + }) + : error; + throw typedError; + } + validateIso8601Timestamp(postMIRequest.data.attributes.timestamp); - let postMIRequest: PostMIRequest; + const result = await postMIOperation( + mapToMI(postMIRequest, supplierId), + deps.miRepo, + ); - try { - postMIRequest = PostMIRequestSchema.parse(JSON.parse(body)); - } catch (error) { - const typedError = - error instanceof Error - ? new ValidationError(ApiErrorDetail.InvalidRequestBody, { - cause: error, - }) - : error; - throw typedError; - } - validateIso8601Timestamp(postMIRequest.data.attributes.timestamp); + deps.logger.info({ + description: "Posted management information", + supplierId: commonIds.value.supplierId, + correlationId: commonIds.value.correlationId, + }); - const result = await postMIOperation( - mapToMI(postMIRequest, supplierId), - deps.miRepo, - ); + // metric with count 1 specifying the supplier + const dimensions: Record = { supplier: supplierId }; + const metric: MetricEntry = { + key: MetricStatus.Success, + value: 1, + unit: Unit.Count, + }; + let emf = buildEMFObject("postMi", dimensions, metric); + deps.logger.info(emf); - deps.logger.info({ - description: "Posted management information", - supplierId: commonIds.value.supplierId, - correlationId: commonIds.value.correlationId, - }); + // metric displaying the type/number of lineItems posted per supplier + dimensions.lineItem = postMIRequest.data.attributes.lineItem; + metric.key = "LineItem per supplier"; + metric.value = postMIRequest.data.attributes.quantity; + emf = buildEMFObject("postMi", dimensions, metric); + deps.logger.info(emf); - // metric with count 1 specifying the supplier - emitForSingleSupplier( - metrics, - "postMi", - supplierId, - 1, - MetricStatus.Success, - ); - // metric displaying the supplier and the type/number of lineItems posted - emitForSingleSupplier( - metrics, - "postMi", - supplierId, - postMIRequest.data.attributes.quantity, - MetricStatus.Success, - { lineItem: postMIRequest.data.attributes.lineItem }, - ); + return { + statusCode: 201, + body: JSON.stringify(result, null, 2), + }; + } catch (error) { + emitErrorMetric(supplierId, deps.logger); + return processError(error, commonIds.value.correlationId, deps.logger); + } + }; +} - return { - statusCode: 201, - body: JSON.stringify(result, null, 2), - }; - } catch (error) { - emitForSingleSupplier( - metrics, - "postMi", - supplierId, - 1, - MetricStatus.Failure, - ); - return processError(error, commonIds.value.correlationId, deps.logger); - } - }; - }); +function emitErrorMetric(supplierId: string, logger: pino.Logger) { + const dimensions: Record = { supplier: supplierId }; + const metric: MetricEntry = { + key: MetricStatus.Failure, + value: 1, + unit: Unit.Count, + }; + const emf = buildEMFObject("postMi", dimensions, metric); + logger.info(emf); } diff --git a/lambdas/api-handler/src/utils/metrics.ts b/lambdas/api-handler/src/utils/metrics.ts index 83a32c3a..1b3929b4 100644 --- a/lambdas/api-handler/src/utils/metrics.ts +++ b/lambdas/api-handler/src/utils/metrics.ts @@ -20,3 +20,36 @@ export enum MetricStatus { Success = "success", Failure = "failure", } + +export interface MetricEntry { + key: string; + value: number; + unit: Unit; +} + +// build EMF object +export function buildEMFObject( + functionName: string, + dimensions: Record, + metric: MetricEntry, +) { + const namespace = process.env.AWS_LAMBDA_FUNCTION_NAME || functionName; + return { + LogGroup: namespace, + ServiceName: namespace, + ...dimensions, + _aws: { + Timestamp: Date.now(), + CloudWatchMetrics: [ + { + Namespace: namespace, + Dimensions: [[...Object.keys(dimensions), "ServiceName", "LogGroup"]], + Metrics: [ + { Name: metric.key, Value: metric.value, Unit: metric.unit }, + ], + }, + ], + }, + [metric.key]: metric.value, + }; +} diff --git a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts index 8ba2e1dd..fe85affd 100644 --- a/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts +++ b/lambdas/letter-updates-transformer/src/letter-updates-transformer.ts @@ -12,53 +12,50 @@ import { import { LetterEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src"; import { mapLetterToCloudEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-mapper"; import { Letter, LetterSchema } from "@internal/datastore"; -import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics"; +import { Unit } from "aws-embedded-metrics"; +import pino from "pino"; import { Deps } from "./deps"; // SNS PublishBatchCommand supports up to 10 messages per batch const BATCH_SIZE = 10; export default function createHandler(deps: Deps): Handler { - return metricScope((metrics: MetricsLogger) => { - return async (streamEvent: KinesisStreamEvent) => { - deps.logger.info({ description: "Received event", streamEvent }); + return async (streamEvent: KinesisStreamEvent) => { + deps.logger.info({ description: "Received event", streamEvent }); + deps.logger.info({ + description: "Number of records", + count: streamEvent.Records?.length || 0, + }); + + // Ensure logging by extracting all records first + const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) => + extractPayload(record, deps), + ); + + const cloudEvents: LetterEvent[] = ddbRecords + .filter((record) => filterRecord(record, deps)) + .map((element) => extractNewLetter(element)) + .map((element) => mapLetterToCloudEvent(element, deps.env.EVENT_SOURCE)); + + const eventTypeCount: Map = + populateEventTypeMap(cloudEvents); + for (const batch of generateBatches(cloudEvents)) { deps.logger.info({ - description: "Number of records", - count: streamEvent.Records?.length || 0, + description: "Publishing batch", + size: batch.length, + letterEvents: batch, }); - - // Ensure logging by extracting all records first - const ddbRecords: DynamoDBRecord[] = streamEvent.Records.map((record) => - extractPayload(record, deps), + await deps.snsClient.send( + new PublishBatchCommand({ + TopicArn: deps.env.EVENTPUB_SNS_TOPIC_ARN, + PublishBatchRequestEntries: batch.map((element, index) => + buildMessage(element, index), + ), + }), ); - - const cloudEvents: LetterEvent[] = ddbRecords - .filter((record) => filterRecord(record, deps)) - .map((element) => extractNewLetter(element)) - .map((element) => - mapLetterToCloudEvent(element, deps.env.EVENT_SOURCE), - ); - - const eventTypeCount: Map = - populateEventTypeMap(cloudEvents); - for (const batch of generateBatches(cloudEvents)) { - deps.logger.info({ - description: "Publishing batch", - size: batch.length, - letterEvents: batch, - }); - await deps.snsClient.send( - new PublishBatchCommand({ - TopicArn: deps.env.EVENTPUB_SNS_TOPIC_ARN, - PublishBatchRequestEntries: batch.map((element, index) => - buildMessage(element, index), - ), - }), - ); - } - emitMetrics(metrics, eventTypeCount); - }; - }); + } + emitMetrics(deps.logger, eventTypeCount); + }; } function populateEventTypeMap(cloudEvents: LetterEvent[]) { @@ -69,18 +66,30 @@ function populateEventTypeMap(cloudEvents: LetterEvent[]) { return evtMap; } -function emitMetrics( - metrics: MetricsLogger, - eventTypeCount: Map, -) { - metrics.setNamespace( - process.env.AWS_LAMBDA_FUNCTION_NAME || "letter-updates-transformer", - ); +function emitMetrics(logger: pino.Logger, eventTypeCount: Map) { + const namespace = + process.env.AWS_LAMBDA_FUNCTION_NAME || "letter-updates-transformer"; + for (const [type, count] of eventTypeCount) { - metrics.putDimensions({ + const emf = { + LogGroup: namespace, + ServiceName: namespace, eventType: type, - }); - metrics.putMetric("events published", count, Unit.Count); + _aws: { + Timestamp: Date.now(), + CloudWatchMetrics: [ + { + Namespace: namespace, + Dimensions: [["eventType", "ServiceName", "LogGroup"]], + Metrics: [ + { Name: "events published", Value: count, Unit: Unit.Count }, + ], + }, + ], + }, + "events published": count, + }; + logger.info(emf); } } diff --git a/lambdas/mi-updates-transformer/src/mi-updates-transformer.ts b/lambdas/mi-updates-transformer/src/mi-updates-transformer.ts index aa02fcdf..c2524585 100644 --- a/lambdas/mi-updates-transformer/src/mi-updates-transformer.ts +++ b/lambdas/mi-updates-transformer/src/mi-updates-transformer.ts @@ -11,7 +11,8 @@ import { PublishBatchRequestEntry, } from "@aws-sdk/client-sns"; import { MISubmittedEvent } from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src"; -import { MetricsLogger, Unit, metricScope } from "aws-embedded-metrics"; +import { Unit } from "aws-embedded-metrics"; +import pino from "pino"; import { mapMIToCloudEvent } from "./mappers/mi-mapper"; import { Deps } from "./deps"; @@ -50,49 +51,58 @@ function extractMIData(record: DynamoDBRecord): MI { return MISchema.parse(unmarshall(newImage as any)); } -function emitMetrics( - metrics: MetricsLogger, - eventTypeCount: Map, -) { - metrics.setNamespace( - process.env.AWS_LAMBDA_FUNCTION_NAME || "letter-updates-transformer", - ); +function emitMetrics(logger: pino.Logger, eventTypeCount: Map) { + const namespace = + process.env.AWS_LAMBDA_FUNCTION_NAME || "mi-updates-transformer"; for (const [type, count] of eventTypeCount) { - metrics.putDimensions({ + const emf = { + LogGroup: namespace, + ServiceName: namespace, eventType: type, - }); - metrics.putMetric("events published", count, Unit.Count); + _aws: { + Timestamp: Date.now(), + CloudWatchMetrics: [ + { + Namespace: namespace, + Dimensions: [["LogGroup", "ServiceName", "eventType"]], + Metrics: [ + { Name: "events published", Value: count, Unit: Unit.Count }, + ], + }, + ], + }, + "events published": count, + }; + logger.info(emf); } } export default function createHandler(deps: Deps): Handler { - return metricScope((metrics: MetricsLogger) => { - return async (streamEvent: KinesisStreamEvent) => { - deps.logger.info({ description: "Received event", streamEvent }); + return async (streamEvent: KinesisStreamEvent) => { + deps.logger.info({ description: "Received event", streamEvent }); - const cloudEvents: MISubmittedEvent[] = streamEvent.Records.map( - (record) => extractPayload(record, deps), - ) - .filter((record) => record.eventName === "INSERT") - .map((element) => extractMIData(element)) - .map((payload) => mapMIToCloudEvent(payload, deps)); + const cloudEvents: MISubmittedEvent[] = streamEvent.Records.map((record) => + extractPayload(record, deps), + ) + .filter((record) => record.eventName === "INSERT") + .map((element) => extractMIData(element)) + .map((payload) => mapMIToCloudEvent(payload, deps)); - const eventTypeCount = new Map(); - for (const batch of generateBatches(cloudEvents)) { - await deps.snsClient.send( - new PublishBatchCommand({ - TopicArn: deps.env.EVENTPUB_SNS_TOPIC_ARN, - PublishBatchRequestEntries: batch.map((element) => { - eventTypeCount.set( - element.type, - (eventTypeCount.get(element.type) || 0) + 1, - ); - return buildMessage(element, deps); - }), + const eventTypeCount = new Map(); + for (const batch of generateBatches(cloudEvents)) { + await deps.snsClient.send( + new PublishBatchCommand({ + TopicArn: deps.env.EVENTPUB_SNS_TOPIC_ARN, + PublishBatchRequestEntries: batch.map((element) => { + eventTypeCount.set( + element.type, + (eventTypeCount.get(element.type) || 0) + 1, + ); + return buildMessage(element, deps); }), - ); - } - emitMetrics(metrics, eventTypeCount); - }; - }); + }), + ); + } + emitMetrics(deps.logger, eventTypeCount); + }; }