diff --git a/infrastructure/terraform/components/api/README.md b/infrastructure/terraform/components/api/README.md index c346bf43..8184dc84 100644 --- a/infrastructure/terraform/components/api/README.md +++ b/infrastructure/terraform/components/api/README.md @@ -63,6 +63,8 @@ No requirements. | [post\_mi](#module\_post\_mi) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.29/terraform-lambda.zip | n/a | | [s3bucket\_test\_letters](#module\_s3bucket\_test\_letters) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-s3bucket.zip | n/a | | [sqs\_letter\_updates](#module\_sqs\_letter\_updates) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-sqs.zip | n/a | +| [sqs\_supplier\_allocator](#module\_sqs\_supplier\_allocator) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-sqs.zip | n/a | +| [supplier\_allocator](#module\_supplier\_allocator) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.29/terraform-lambda.zip | n/a | | [supplier\_ssl](#module\_supplier\_ssl) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-ssl.zip | n/a | | [upsert\_letter](#module\_upsert\_letter) | https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.29/terraform-lambda.zip | n/a | ## Outputs diff --git a/infrastructure/terraform/components/api/lambda_event_source_mapping_supplier_allocator.tf b/infrastructure/terraform/components/api/lambda_event_source_mapping_supplier_allocator.tf new file mode 100644 index 00000000..66bfd679 --- /dev/null +++ b/infrastructure/terraform/components/api/lambda_event_source_mapping_supplier_allocator.tf @@ -0,0 +1,9 @@ +resource "aws_lambda_event_source_mapping" "supplier_allocator" { + event_source_arn = module.sqs_supplier_allocator.sqs_queue_arn + function_name = module.supplier_allocator.function_name + batch_size = 10 + maximum_batching_window_in_seconds = 5 + function_response_types = [ + "ReportBatchItemFailures" + ] +} diff --git a/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf new file mode 100644 index 00000000..76cfb22a --- /dev/null +++ b/infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf @@ -0,0 +1,85 @@ +module "supplier_allocator" { + source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.29/terraform-lambda.zip" + + function_name = "supplier-allocator" + description = "Allocate a letter to a supplier" + + aws_account_id = var.aws_account_id + component = var.component + environment = var.environment + project = var.project + region = var.region + group = var.group + + log_retention_in_days = var.log_retention_in_days + kms_key_arn = module.kms.key_arn + + iam_policy_document = { + body = data.aws_iam_policy_document.supplier_allocator_lambda.json + } + + function_s3_bucket = local.acct.s3_buckets["lambda_function_artefacts"]["id"] + function_code_base_path = local.aws_lambda_functions_dir_path + function_code_dir = "supplier-allocator/dist" + function_include_common = true + handler_function_name = "supplierAllocatorHandler" + runtime = "nodejs22.x" + memory = 512 + timeout = 29 + log_level = var.log_level + + force_lambda_code_deploy = var.force_lambda_code_deploy + enable_lambda_insights = false + + log_destination_arn = local.destination_arn + log_subscription_role_arn = local.acct.log_subscription_role_arn + + lambda_env_vars = merge(local.common_lambda_env_vars, { + VARIANT_MAP = jsonencode(var.letter_variant_map) + UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url + }) +} + +data "aws_iam_policy_document" "supplier_allocator_lambda" { + statement { + sid = "KMSPermissions" + effect = "Allow" + + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey", + ] + + resources = [ + module.kms.key_arn, + ] + } + + statement { + sid = "AllowSQSRead" + effect = "Allow" + + actions = [ + "sqs:ReceiveMessage", + "sqs:DeleteMessage", + "sqs:GetQueueAttributes" + ] + + resources = [ + module.sqs_supplier_allocator.sqs_queue_arn + ] + } + + statement { + sid = "AllowSQSWrite" + effect = "Allow" + + actions = [ + "sqs:SendMessage" + ] + + resources = [ + module.sqs_letter_updates.sqs_queue_arn + ] + } +} diff --git a/infrastructure/terraform/components/api/module_sqs_supplier_allocator.tf b/infrastructure/terraform/components/api/module_sqs_supplier_allocator.tf new file mode 100644 index 00000000..68b34c8e --- /dev/null +++ b/infrastructure/terraform/components/api/module_sqs_supplier_allocator.tf @@ -0,0 +1,48 @@ +module "sqs_supplier_allocator" { + source = "https://github.com/NHSDigital/nhs-notify-shared-modules/releases/download/v2.0.26/terraform-sqs.zip" + + aws_account_id = var.aws_account_id + component = var.component + environment = var.environment + project = var.project + region = var.region + name = "supplier-allocator" + + sqs_kms_key_arn = module.kms.key_arn + + visibility_timeout_seconds = 60 + + create_dlq = true + sqs_policy_overload = data.aws_iam_policy_document.supplier_allocator_queue_policy.json +} + +data "aws_iam_policy_document" "supplier_allocator_queue_policy" { + version = "2012-10-17" + + statement { + sid = "AllowSNSPermissions" + effect = "Allow" + + principals { + type = "Service" + identifiers = ["sns.amazonaws.com"] + } + + actions = [ + "sqs:SendMessage", + "sqs:ListQueueTags", + "sqs:GetQueueUrl", + "sqs:GetQueueAttributes", + ] + + resources = [ + "arn:aws:sqs:${var.region}:${var.aws_account_id}:${var.project}-${var.environment}-${var.component}-supplier-allocator-queue" + ] + + condition { + test = "ArnEquals" + variable = "aws:SourceArn" + values = [module.eventsub.sns_topic.arn] + } + } +} diff --git a/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_letter_updates.tf b/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_letter_updates.tf index 9c232c14..96cba64d 100644 --- a/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_letter_updates.tf +++ b/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_letter_updates.tf @@ -1,5 +1,11 @@ resource "aws_sns_topic_subscription" "eventsub_sqs_letter_updates" { - topic_arn = module.eventsub.sns_topic.arn - protocol = "sqs" - endpoint = module.sqs_letter_updates.sqs_queue_arn + topic_arn = module.eventsub.sns_topic.arn + protocol = "sqs" + endpoint = module.sqs_letter_updates.sqs_queue_arn + raw_message_delivery = true + + filter_policy_scope = "MessageBody" + filter_policy = jsonencode({ + type = [{ prefix = "uk.nhs.notify.supplier-api.letter" }] + }) } diff --git a/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_supplier_allocator.tf b/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_supplier_allocator.tf new file mode 100644 index 00000000..59afab80 --- /dev/null +++ b/infrastructure/terraform/components/api/sns_topic_subscription_eventsub_sqs_supplier_allocator.tf @@ -0,0 +1,11 @@ +resource "aws_sns_topic_subscription" "eventsub_sqs_supplier_allocator" { + topic_arn = module.eventsub.sns_topic.arn + protocol = "sqs" + endpoint = module.sqs_supplier_allocator.sqs_queue_arn + raw_message_delivery = true + + filter_policy_scope = "MessageBody" + filter_policy = jsonencode({ + type = [{ prefix = "uk.nhs.notify.letter-rendering.letter-request.prepared" }] + }) +} diff --git a/lambdas/supplier-allocator/.eslintignore b/lambdas/supplier-allocator/.eslintignore new file mode 100644 index 00000000..1521c8b7 --- /dev/null +++ b/lambdas/supplier-allocator/.eslintignore @@ -0,0 +1 @@ +dist diff --git a/lambdas/supplier-allocator/.gitignore b/lambdas/supplier-allocator/.gitignore new file mode 100644 index 00000000..80323f7c --- /dev/null +++ b/lambdas/supplier-allocator/.gitignore @@ -0,0 +1,4 @@ +coverage +node_modules +dist +.reports diff --git a/lambdas/supplier-allocator/jest.config.ts b/lambdas/supplier-allocator/jest.config.ts new file mode 100644 index 00000000..87279451 --- /dev/null +++ b/lambdas/supplier-allocator/jest.config.ts @@ -0,0 +1,66 @@ +export const baseJestConfig = { + preset: "ts-jest", + extensionsToTreatAsEsm: [".ts"], + transform: { + "^.+\\.ts$": [ + "ts-jest", + { + useESM: true, + }, + ], + }, + + // Automatically clear mock calls, instances, contexts and results before every test + clearMocks: true, + + // Indicates whether the coverage information should be collected while executing the test + collectCoverage: true, + + // The directory where Jest should output its coverage files + coverageDirectory: "./.reports/unit/coverage", + + // Indicates which provider should be used to instrument code for coverage + coverageProvider: "babel", + + coverageThreshold: { + global: { + branches: 100, + functions: 100, + lines: 100, + statements: -10, + }, + }, + + coveragePathIgnorePatterns: ["/__tests__/"], + testPathIgnorePatterns: [".build"], + testMatch: ["**/?(*.)+(spec|test).[jt]s?(x)"], + + // Use this configuration option to add custom reporters to Jest + reporters: [ + "default", + [ + "jest-html-reporter", + { + pageTitle: "Test Report", + outputPath: "./.reports/unit/test-report.html", + includeFailureMsg: true, + }, + ], + ], + + // The test environment that will be used for testing + testEnvironment: "jsdom", +}; + +const utilsJestConfig = { + ...baseJestConfig, + + testEnvironment: "node", + + coveragePathIgnorePatterns: [ + ...(baseJestConfig.coveragePathIgnorePatterns ?? []), + "zod-validators.ts", + ], +}; + +export default utilsJestConfig; diff --git a/lambdas/supplier-allocator/package.json b/lambdas/supplier-allocator/package.json new file mode 100644 index 00000000..d618994f --- /dev/null +++ b/lambdas/supplier-allocator/package.json @@ -0,0 +1,36 @@ +{ + "dependencies": { + "@aws-sdk/client-dynamodb": "^3.858.0", + "@aws-sdk/client-sqs": "^3.984.0", + "@aws-sdk/lib-dynamodb": "^3.858.0", + "@internal/datastore": "*", + "@internal/helpers": "^0.1.0", + "@nhsdigital/nhs-notify-event-schemas-letter-rendering": "^2.0.1", + "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1": "npm:@nhsdigital/nhs-notify-event-schemas-letter-rendering@^1.1.5", + "@nhsdigital/nhs-notify-event-schemas-supplier-api": "^1.0.8", + "@types/aws-lambda": "^8.10.148", + "aws-lambda": "^1.0.7", + "esbuild": "^0.27.2", + "pino": "^9.7.0", + "zod": "^4.1.11" + }, + "devDependencies": { + "@tsconfig/node22": "^22.0.2", + "@types/aws-lambda": "^8.10.148", + "@types/jest": "^30.0.0", + "jest": "^30.2.0", + "jest-mock-extended": "^4.0.0", + "ts-jest": "^29.4.0", + "typescript": "^5.8.3" + }, + "name": "nhs-notify-supplier-api-allocate-letter", + "private": true, + "scripts": { + "lambda-build": "rm -rf dist && npx esbuild --bundle --minify --sourcemap --target=es2020 --platform=node --loader:.node=file --entry-names=[name] --outdir=dist src/index.ts", + "lint": "eslint .", + "lint:fix": "eslint . --fix", + "test:unit": "jest", + "typecheck": "tsc --noEmit" + }, + "version": "0.0.1" +} diff --git a/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts new file mode 100644 index 00000000..b8b7b736 --- /dev/null +++ b/lambdas/supplier-allocator/src/config/__tests__/deps.test.ts @@ -0,0 +1,43 @@ +import type { Deps } from "lambdas/supplier-allocator/src/config/deps"; + +describe("createDependenciesContainer", () => { + const env = { + VARIANT_MAP: { + lv1: { + supplierId: "supplier1", + specId: "spec1", + }, + }, + }; + + beforeEach(() => { + jest.clearAllMocks(); + jest.resetModules(); + + // @internal/helpers - createLogger + jest.mock("@internal/helpers", () => ({ + createLogger: jest.fn(() => ({ + info: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + level: "info", + })), + })); + + // Env + jest.mock("../env", () => ({ envVars: env })); + }); + + test("constructs deps and wires repository config correctly", async () => { + // get current mock instances + const { createLogger } = jest.requireMock("@internal/helpers"); + + // eslint-disable-next-line @typescript-eslint/no-require-imports + const { createDependenciesContainer } = require("../deps"); + const deps: Deps = createDependenciesContainer(); + expect(createLogger).toHaveBeenCalledTimes(1); + + expect(deps.env).toEqual(env); + }); +}); diff --git a/lambdas/supplier-allocator/src/config/__tests__/env.test.ts b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts new file mode 100644 index 00000000..dd013aeb --- /dev/null +++ b/lambdas/supplier-allocator/src/config/__tests__/env.test.ts @@ -0,0 +1,42 @@ +import { ZodError } from "zod"; +/* eslint-disable @typescript-eslint/no-require-imports */ +/* Allow require imports to enable re-import of modules */ + +describe("lambdaEnv", () => { + const OLD_ENV = process.env; + + beforeEach(() => { + jest.resetModules(); // Clears cached modules + process.env = { ...OLD_ENV }; // Clone original env + }); + + afterAll(() => { + process.env = OLD_ENV; // Restore + }); + + it("should load all environment variables successfully", () => { + process.env.VARIANT_MAP = `{ + "lv1": { + "supplierId": "supplier1", + "specId": "spec1" + } + }`; + + const { envVars } = require("../env"); + + expect(envVars).toEqual({ + VARIANT_MAP: { + lv1: { + supplierId: "supplier1", + specId: "spec1", + }, + }, + }); + }); + + it("should throw if a required env var is missing", () => { + process.env.VARIANT_MAP = undefined; + + expect(() => require("../env")).toThrow(ZodError); + }); +}); diff --git a/lambdas/supplier-allocator/src/config/deps.ts b/lambdas/supplier-allocator/src/config/deps.ts new file mode 100644 index 00000000..1a5f6ce5 --- /dev/null +++ b/lambdas/supplier-allocator/src/config/deps.ts @@ -0,0 +1,20 @@ +import { SQSClient } from "@aws-sdk/client-sqs"; +import { Logger } from "pino"; +import { createLogger } from "@internal/helpers"; +import { EnvVars, envVars } from "./env"; + +export type Deps = { + logger: Logger; + env: EnvVars; + sqsClient: SQSClient; +}; + +export function createDependenciesContainer(): Deps { + const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL }); + + return { + logger: log, + env: envVars, + sqsClient: new SQSClient({}), + }; +} diff --git a/lambdas/supplier-allocator/src/config/env.ts b/lambdas/supplier-allocator/src/config/env.ts new file mode 100644 index 00000000..0adc3920 --- /dev/null +++ b/lambdas/supplier-allocator/src/config/env.ts @@ -0,0 +1,22 @@ +import { z } from "zod"; + +const LetterVariantSchema = z.record( + z.string(), + z.object({ + supplierId: z.string(), + specId: z.string(), + }), +); +export type LetterVariant = z.infer; + +const EnvVarsSchema = z.object({ + PINO_LOG_LEVEL: z.coerce.string().optional(), + VARIANT_MAP: z.string().transform((str, _) => { + const parsed = JSON.parse(str); + return LetterVariantSchema.parse(parsed); + }), +}); + +export type EnvVars = z.infer; + +export const envVars = EnvVarsSchema.parse(process.env); diff --git a/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts new file mode 100644 index 00000000..23fc5981 --- /dev/null +++ b/lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts @@ -0,0 +1,438 @@ +import { SQSEvent, SQSRecord } from "aws-lambda"; +import pino from "pino"; +import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; +import { LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering"; +import { LetterRequestPreparedEvent } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1"; +import { + $LetterEvent, + LetterEvent, +} from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events"; +import createSupplierAllocatorHandler from "../allocate-handler"; +import { Deps } from "../../config/deps"; +import { EnvVars } from "../../config/env"; + +function createSQSEvent(records: SQSRecord[]): SQSEvent { + return { + Records: records, + }; +} + +function createSqsRecord(msgId: string, body: string): SQSRecord { + return { + messageId: msgId, + receiptHandle: "", + body, + attributes: { + ApproximateReceiveCount: "", + SentTimestamp: "", + SenderId: "", + ApproximateFirstReceiveTimestamp: "", + }, + messageAttributes: {}, + md5OfBody: "", + eventSource: "", + eventSourceARN: "", + awsRegion: "", + }; +} + +function createPreparedV1Event( + overrides: Partial = {}, +): LetterRequestPreparedEvent { + const now = new Date().toISOString(); + + return { + specversion: "1.0", + id: overrides.id ?? "7b9a03ca-342a-4150-b56b-989109c45613", + source: "/data-plane/letter-rendering/test", + subject: "client/client1/letter-request/letterRequest1", + type: "uk.nhs.notify.letter-rendering.letter-request.prepared.v1", + time: now, + dataschema: + "https://notify.nhs.uk/cloudevents/schemas/letter-rendering/letter-request.prepared.1.0.0.schema.json", + dataschemaversion: "1.0.0", + data: { + domainId: overrides.domainId ?? "letter1", + letterVariantId: "lv1", + requestId: "request1", + requestItemId: "requestItem1", + requestItemPlanId: "requestItemPlan1", + clientId: "client1", + campaignId: "campaign1", + templateId: "template1", + url: overrides.url ?? "s3://letterDataBucket/letter1.pdf", + sha256Hash: + "3a7bd3e2360a3d29eea436fcfb7e44c735d117c8f2f1d2d1e4f6e8f7e6e8f7e6", + createdAt: now, + pageCount: 1, + status: "PREPARED", + }, + traceparent: "00-0af7651916cd43dd8448eb211c803191-b7ad6b7169203331-01", + recordedtime: now, + severitynumber: 2, + severitytext: "INFO", + plane: "data", + }; +} + +function createPreparedV2Event( + overrides: Partial = {}, +): LetterRequestPreparedEventV2 { + return { + ...createPreparedV1Event(overrides), + type: "uk.nhs.notify.letter-rendering.letter-request.prepared.v2", + dataschema: + "https://notify.nhs.uk/cloudevents/schemas/letter-rendering/letter-request.prepared.2.0.1.schema.json", + dataschemaversion: "2.0.1", + }; +} + +function createSupplierStatusChangeEvent( + overrides: Partial = {}, +): LetterEvent { + const now = new Date().toISOString(); + + return $LetterEvent.parse({ + data: { + domainId: overrides.domainId ?? "f47ac10b-58cc-4372-a567-0e02b2c3d479", + groupId: "client_template", + origin: { + domain: "letter-rendering", + event: "f47ac10b-58cc-4372-a567-0e02b2c3d479", + source: "/data-plane/letter-rendering/prod/render-pdf", + subject: + "client/00f3b388-bbe9-41c9-9e76-052d37ee8988/letter-request/0o5Fs0EELR0fUjHjbCnEtdUwQe4_0o5Fs0EELR0fUjHjbCnEtdUwQe5", + }, + reasonCode: "R07", + reasonText: "No such address", + specificationId: "1y3q9v1zzzz", + billingRef: "1y3q9v1zzzz", + status: "RETURNED", + supplierId: "supplier1", + }, + datacontenttype: "application/json", + dataschema: + "https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.RETURNED.1.0.0.schema.json", + dataschemaversion: "1.0.0", + id: overrides.id ?? "23f1f09c-a555-4d9b-8405-0b33490bc920", + plane: "data", + recordedtime: now, + severitynumber: 2, + severitytext: "INFO", + source: "/data-plane/supplier-api/prod/update-status", + specversion: "1.0", + subject: + "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", + time: now, + traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", + type: "uk.nhs.notify.supplier-api.letter.RETURNED.v1", + }); +} + +describe("createSupplierAllocatorHandler", () => { + let mockSqsClient: jest.Mocked; + let mockedDeps: jest.Mocked; + + beforeEach(() => { + mockSqsClient = { + send: jest.fn(), + } as unknown as jest.Mocked; + + mockedDeps = { + logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger, + env: { + VARIANT_MAP: { + lv1: { + supplierId: "supplier1", + specId: "spec1", + }, + }, + } as EnvVars, + sqsClient: mockSqsClient, + } as jest.Mocked; + + jest.clearAllMocks(); + }); + + test("parses SNS notification and sends message to SQS queue for v2 event", async () => { + const preparedEvent = createPreparedV2Event(); + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + + expect(result).toBeDefined(); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(0); + + expect(mockSqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(sendCall).toBeInstanceOf(SendMessageCommand); + + const messageBody = JSON.parse(sendCall.input.MessageBody); + expect(messageBody.letterEvent).toEqual(preparedEvent); + expect(messageBody.supplierSpec).toEqual({ + supplierId: "supplier1", + specId: "spec1", + }); + }); + + test("parses SNS notification and sends message to SQS queue for v1 event", async () => { + const preparedEvent = createPreparedV1Event(); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + + expect(result).toBeDefined(); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(0); + + expect(mockSqsClient.send).toHaveBeenCalledTimes(1); + const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const messageBody = JSON.parse(sendCall.input.MessageBody); + expect(messageBody.supplierSpec).toEqual({ + supplierId: "supplier1", + specId: "spec1", + }); + }); + + test("returns batch failure for Update event", async () => { + const preparedEvent = createSupplierStatusChangeEvent(); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("invalid-event", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + + expect(result).toBeDefined(); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("invalid-event"); + expect((mockedDeps.logger.error as jest.Mock).mock.calls).toHaveLength(1); + }); + + test("unwraps EventBridge envelope and extracts event details", async () => { + const preparedEvent = createPreparedV2Event({ domainId: "letter-test" }); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + await handler(evt, {} as any, {} as any); + + const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const messageBody = JSON.parse(sendCall.input.MessageBody); + expect(messageBody.letterEvent.data.domainId).toBe("letter-test"); + }); + + test("resolves correct supplier spec from variant map", async () => { + const preparedEvent = createPreparedV2Event(); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + await handler(evt, {} as any, {} as any); + + const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + const messageBody = JSON.parse(sendCall.input.MessageBody); + expect(messageBody.supplierSpec.supplierId).toBe("supplier1"); + expect(messageBody.supplierSpec.specId).toBe("spec1"); + }); + + test("processes multiple messages in batch", async () => { + const evt: SQSEvent = createSQSEvent([ + createSqsRecord( + "msg1", + JSON.stringify(createPreparedV2Event({ domainId: "letter1" })), + ), + createSqsRecord( + "msg2", + JSON.stringify(createPreparedV2Event({ domainId: "letter2" })), + ), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + + expect(result).toBeDefined(); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(0); + expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + }); + + test("returns batch failure for invalid JSON", async () => { + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("bad-json", "this-is-not-json"), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + + expect(result).toBeDefined(); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("bad-json"); + expect((mockedDeps.logger.error as jest.Mock).mock.calls).toHaveLength(1); + }); + + test("returns batch failure when event type is missing", async () => { + const event = { no: "type" }; + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("no-type", JSON.stringify(event)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("no-type"); + }); + + test("returns batch failure when UPSERT_LETTERS_QUEUE_URL is not set", async () => { + const preparedEvent = createPreparedV2Event(); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + delete process.env.UPSERT_LETTERS_QUEUE_URL; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("msg1"); + expect((mockedDeps.logger.error as jest.Mock).mock.calls[0][0].err).toEqual( + expect.objectContaining({ + message: "UPSERT_LETTERS_QUEUE_URL not configured", + }), + ); + }); + + test("returns batch failure when variant mapping is missing", async () => { + const preparedEvent = createPreparedV2Event(); + preparedEvent.data.letterVariantId = "missing-variant"; + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + // Override variant map to be empty for this test + mockedDeps.env.VARIANT_MAP = {} as any; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("msg1"); + expect( + (mockedDeps.logger.error as jest.Mock).mock.calls.length, + ).toBeGreaterThan(0); + expect((mockedDeps.logger.error as jest.Mock).mock.calls[0][0]).toEqual( + expect.objectContaining({ + description: "No supplier mapping found for variant", + }), + ); + }); + + test("handles SQS send errors and returns batch failure", async () => { + const preparedEvent = createPreparedV2Event(); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const sqsError = new Error("SQS send failed"); + (mockSqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError); + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("msg1"); + expect((mockedDeps.logger.error as jest.Mock).mock.calls).toHaveLength(1); + }); + + test("processes mixed batch with successes and failures", async () => { + const evt: SQSEvent = createSQSEvent([ + createSqsRecord( + "ok-msg", + JSON.stringify(createPreparedV2Event({ domainId: "letter1" })), + ), + createSqsRecord("fail-msg", "invalid-json"), + createSqsRecord( + "ok-msg-2", + JSON.stringify(createPreparedV2Event({ domainId: "letter2" })), + ), + ]); + + process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue"; + + const handler = createSupplierAllocatorHandler(mockedDeps); + const result = await handler(evt, {} as any, {} as any); + if (!result) throw new Error("expected BatchResponse, got void"); + + expect(result.batchItemFailures).toHaveLength(1); + expect(result.batchItemFailures[0].itemIdentifier).toBe("fail-msg"); + + expect(mockSqsClient.send).toHaveBeenCalledTimes(2); + }); + + test("sends correct queue URL in SQS message command", async () => { + const preparedEvent = createPreparedV2Event(); + + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(preparedEvent)), + ]); + + const queueUrl = "https://sqs.eu-west-2.amazonaws.com/123456789/test-queue"; + process.env.UPSERT_LETTERS_QUEUE_URL = queueUrl; + + const handler = createSupplierAllocatorHandler(mockedDeps); + await handler(evt, {} as any, {} as any); + + const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0]; + expect(sendCall.input.QueueUrl).toBe(queueUrl); + }); +}); diff --git a/lambdas/supplier-allocator/src/handler/allocate-handler.ts b/lambdas/supplier-allocator/src/handler/allocate-handler.ts new file mode 100644 index 00000000..0402e284 --- /dev/null +++ b/lambdas/supplier-allocator/src/handler/allocate-handler.ts @@ -0,0 +1,113 @@ +import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; +import { SendMessageCommand } from "@aws-sdk/client-sqs"; +import { LetterRequestPreparedEvent } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1"; + +import { LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering"; +import z from "zod"; +import { Deps } from "../config/deps"; + +type SupplierSpec = { supplierId: string; specId: string }; +type PreparedEvents = LetterRequestPreparedEventV2 | LetterRequestPreparedEvent; + +// small envelope that must exist in all inputs +const TypeEnvelope = z.object({ type: z.string().min(1) }); + +function resolveSupplierForVariant( + variantId: string, + deps: Deps, +): SupplierSpec { + deps.logger.info({ + description: "Resolving supplier for letter variant", + variantId, + }); + const supplier = deps.env.VARIANT_MAP[variantId]; + if (!supplier) { + deps.logger.error({ + description: "No supplier mapping found for variant", + variantId, + }); + throw new Error(`No supplier mapping for variantId: ${variantId}`); + } + + return supplier; +} + +function validateType(event: unknown) { + const env = TypeEnvelope.safeParse(event); + if (!env.success) { + throw new Error("Missing or invalid envelope.type field"); + } + if ( + !env.data.type.startsWith( + "uk.nhs.notify.letter-rendering.letter-request.prepared", + ) + ) { + throw new Error(`Unexpected event type: ${env.data.type}`); + } +} + +function getSupplier(letterEvent: PreparedEvents, deps: Deps): SupplierSpec { + return resolveSupplierForVariant(letterEvent.data.letterVariantId, deps); +} + +export default function createSupplierAllocatorHandler(deps: Deps): SQSHandler { + return async (event: SQSEvent) => { + const batchItemFailures: SQSBatchItemFailure[] = []; + + const tasks = event.Records.map(async (record) => { + try { + const letterEvent: unknown = JSON.parse(record.body); + + deps.logger.info({ + description: "Extracted letter event", + messageId: record.messageId, + }); + + validateType(letterEvent); + + const supplierSpec = getSupplier(letterEvent as PreparedEvents, deps); + + deps.logger.info({ + description: "Resolved supplier spec", + supplierSpec, + }); + + // Send to allocated letters queue + const queueUrl = process.env.UPSERT_LETTERS_QUEUE_URL; + if (!queueUrl) { + throw new Error("UPSERT_LETTERS_QUEUE_URL not configured"); + } + + const queueMessage = { + letterEvent, + supplierSpec, + }; + + deps.logger.info({ + description: "Sending message to upsert letter queue", + msg: queueMessage, + url: queueUrl, + }); + + await deps.sqsClient.send( + new SendMessageCommand({ + QueueUrl: queueUrl, + MessageBody: JSON.stringify(queueMessage), + }), + ); + } catch (error) { + deps.logger.error({ + description: "Error processing allocation of record", + err: error, + messageId: record.messageId, + message: record.body, + }); + batchItemFailures.push({ itemIdentifier: record.messageId }); + } + }); + + await Promise.all(tasks); + + return { batchItemFailures }; + }; +} diff --git a/lambdas/supplier-allocator/src/index.ts b/lambdas/supplier-allocator/src/index.ts new file mode 100644 index 00000000..56d6b8e1 --- /dev/null +++ b/lambdas/supplier-allocator/src/index.ts @@ -0,0 +1,8 @@ +import { createDependenciesContainer } from "./config/deps"; +import createSupplierAllocatorHandler from "./handler/allocate-handler"; + +const container = createDependenciesContainer(); + +// eslint-disable-next-line import-x/prefer-default-export +export const supplierAllocatorHandler = + createSupplierAllocatorHandler(container); diff --git a/lambdas/supplier-allocator/tsconfig.json b/lambdas/supplier-allocator/tsconfig.json new file mode 100644 index 00000000..528c0c8b --- /dev/null +++ b/lambdas/supplier-allocator/tsconfig.json @@ -0,0 +1,13 @@ +{ + "compilerOptions": { + "types": [ + "jest", + "node" + ] + }, + "extends": "../../tsconfig.base.json", + "include": [ + "src/**/*", + "jest.config.ts" + ] +} diff --git a/lambdas/upsert-letter/src/config/__tests__/env.test.ts b/lambdas/upsert-letter/src/config/__tests__/env.test.ts index 3c3de230..7f346279 100644 --- a/lambdas/upsert-letter/src/config/__tests__/env.test.ts +++ b/lambdas/upsert-letter/src/config/__tests__/env.test.ts @@ -17,31 +17,17 @@ describe("lambdaEnv", () => { it("should load all environment variables successfully", () => { process.env.LETTERS_TABLE_NAME = "letters-table"; process.env.LETTER_TTL_HOURS = "12960"; - process.env.VARIANT_MAP = `{ - "lv1": { - "supplierId": "supplier1", - "specId": "spec1" - } - }`; const { envVars } = require("../env"); expect(envVars).toEqual({ LETTERS_TABLE_NAME: "letters-table", LETTER_TTL_HOURS: 12_960, - VARIANT_MAP: { - lv1: { - supplierId: "supplier1", - specId: "spec1", - }, - }, }); }); it("should throw if a required env var is missing", () => { process.env.LETTERS_TABLE_NAME = "table"; - process.env.LETTER_TTL_HOURS = "12960"; - process.env.VARIANT_MAP = undefined; expect(() => require("../env")).toThrow(ZodError); }); diff --git a/lambdas/upsert-letter/src/config/env.ts b/lambdas/upsert-letter/src/config/env.ts index 8230088b..fd205813 100644 --- a/lambdas/upsert-letter/src/config/env.ts +++ b/lambdas/upsert-letter/src/config/env.ts @@ -1,21 +1,8 @@ import { z } from "zod"; -const LetterVariantSchema = z.record( - z.string(), - z.object({ - supplierId: z.string(), - specId: z.string(), - }), -); -export type LetterVariant = z.infer; - const EnvVarsSchema = z.object({ LETTERS_TABLE_NAME: z.string(), LETTER_TTL_HOURS: z.coerce.number().int(), - VARIANT_MAP: z.string().transform((str, _) => { - const parsed = JSON.parse(str); - return LetterVariantSchema.parse(parsed); - }), PINO_LOG_LEVEL: z.coerce.string().optional(), }); diff --git a/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts b/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts index a94942d0..6292de0f 100644 --- a/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts +++ b/lambdas/upsert-letter/src/handler/__tests__/upsert-handler.test.ts @@ -1,4 +1,4 @@ -import { SNSMessage, SQSEvent, SQSRecord } from "aws-lambda"; +import { SQSEvent, SQSRecord } from "aws-lambda"; import pino from "pino"; import { LetterRepository } from "internal/datastore/src"; import { LetterRequestPreparedEventV2 } from "@nhsdigital/nhs-notify-event-schemas-letter-rendering"; @@ -36,63 +36,6 @@ function createSqsRecord(msgId: string, body: string): SQSRecord { }; } -type SupportedEvent = - | LetterRequestPreparedEventV2 - | LetterRequestPreparedEvent - | LetterEvent; - -function createEventBridgeNotification( - event: SupportedEvent, -): Partial { - return { - SignatureVersion: "", - Timestamp: "", - Signature: "", - SigningCertUrl: "", - MessageId: "", - Message: createEventBridgeEvent(event), - MessageAttributes: {}, - Type: "Notification", - UnsubscribeUrl: "", - TopicArn: "", - Subject: "", - Token: "", - }; -} - -function createNotification(event: SupportedEvent): Partial { - return { - SignatureVersion: "", - Timestamp: "", - Signature: "", - SigningCertUrl: "", - MessageId: "", - Message: JSON.stringify(event), - MessageAttributes: {}, - Type: "Notification", - UnsubscribeUrl: "", - TopicArn: "", - Subject: "", - Token: "", - }; -} - -function createEventBridgeEvent(event: SupportedEvent): string { - const now = new Date().toISOString(); - const eventBridgeEnvelope = { - version: "0", - id: "4f28e649-6832-18e8-7261-4b63e6dcd3b5", - "detail-type": event.type, - source: "custom.event", - account: "815490582396", - time: now, - region: "eu-west-2", - resources: [], - detail: event, - }; - return JSON.stringify(eventBridgeEnvelope); -} - function createPreparedV1Event( overrides: Partial = {}, ): LetterRequestPreparedEvent { @@ -132,6 +75,48 @@ function createPreparedV1Event( }; } +function createSupplierStatusChangeEventWithoutSupplier( + overrides: Partial = {}, +): LetterEvent { + const now = new Date().toISOString(); + + return $LetterEvent.parse({ + data: { + domainId: overrides.domainId ?? "f47ac10b-58cc-4372-a567-0e02b2c3d479", + groupId: "client_template", + origin: { + domain: "letter-rendering", + event: "f47ac10b-58cc-4372-a567-0e02b2c3d479", + source: "/data-plane/letter-rendering/prod/render-pdf", + subject: + "client/00f3b388-bbe9-41c9-9e76-052d37ee8988/letter-request/0o5Fs0EELR0fUjHjbCnEtdUwQe4_0o5Fs0EELR0fUjHjbCnEtdUwQe5", + }, + reasonCode: "R07", + reasonText: "No such address", + specificationId: "1y3q9v1zzzz", + billingRef: "1y3q9v1zzzz", + status: "RETURNED", + supplierId: "", + }, + datacontenttype: "application/json", + dataschema: + "https://notify.nhs.uk/cloudevents/schemas/supplier-api/letter.RETURNED.1.0.0.schema.json", + dataschemaversion: "1.0.0", + id: overrides.id ?? "23f1f09c-a555-4d9b-8405-0b33490bc920", + plane: "data", + recordedtime: now, + severitynumber: 2, + severitytext: "INFO", + source: "/data-plane/supplier-api/prod/update-status", + specversion: "1.0", + subject: + "letter-origin/letter-rendering/letter/f47ac10b-58cc-4372-a567-0e02b2c3d479", + time: now, + traceparent: "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", + type: "uk.nhs.notify.supplier-api.letter.RETURNED.v1", + }); +} + function createPreparedV2Event( overrides: Partial = {}, ): LetterRequestPreparedEventV2 { @@ -186,6 +171,26 @@ function createSupplierStatusChangeEvent( }); } +// Mock aws-embedded-metrics +let mockMetrics: any; +jest.mock("aws-embedded-metrics", () => ({ + metricScope: ( + handler: (metrics: any) => (event: SQSEvent) => Promise, + ) => { + return async (event: SQSEvent) => { + mockMetrics = { + setNamespace: jest.fn(), + putDimensions: jest.fn(), + putMetric: jest.fn(), + }; + return handler(mockMetrics)(event); + }; + }, + Unit: { + Count: "Count", + }, +})); + describe("createUpsertLetterHandler", () => { const mockedDeps: jest.Mocked = { letterRepo: { @@ -196,12 +201,6 @@ describe("createUpsertLetterHandler", () => { env: { LETTERS_TABLE_NAME: "LETTERS_TABLE_NAME", LETTER_TTL_HOURS: 12_960, - VARIANT_MAP: { - lv1: { - supplierId: "supplier1", - specId: "spec1", - }, - }, } as EnvVars, } as Deps; @@ -210,14 +209,21 @@ describe("createUpsertLetterHandler", () => { }); test("processes all records successfully and returns no batch failures", async () => { + const v2message = { + letterEvent: createPreparedV2Event(), + supplierSpec: { supplierId: "supplier1", specId: "spec1" }, + }; + const v1message = { + letterEvent: createPreparedV1Event(), + supplierSpec: { supplierId: "supplier1", specId: "spec1" }, + }; + const evt: SQSEvent = createSQSEvent([ + createSqsRecord("msg1", JSON.stringify(v2message)), + createSqsRecord("msg2", JSON.stringify(v1message)), createSqsRecord( - "msg1", - JSON.stringify(createEventBridgeNotification(createPreparedV2Event())), - ), - createSqsRecord( - "msg2", - JSON.stringify(createNotification(createSupplierStatusChangeEvent())), + "msg3", + JSON.stringify(createSupplierStatusChangeEvent()), ), ]); @@ -231,83 +237,70 @@ describe("createUpsertLetterHandler", () => { if (!result) throw new Error("expected BatchResponse, got void"); expect(result.batchItemFailures).toHaveLength(0); - expect(mockedDeps.letterRepo.putLetter).toHaveBeenCalledTimes(1); + expect(mockedDeps.letterRepo.putLetter).toHaveBeenCalledTimes(2); expect(mockedDeps.letterRepo.updateLetterStatus).toHaveBeenCalledTimes(1); - - const firstArg = (mockedDeps.letterRepo.putLetter as jest.Mock).mock + const insertedV2Letter = (mockedDeps.letterRepo.putLetter as jest.Mock).mock .calls[0][0]; - expect(firstArg.id).toBe("letter1"); - expect(firstArg.eventId).toBe("7b9a03ca-342a-4150-b56b-989109c45613"); - expect(firstArg.supplierId).toBe("supplier1"); - expect(firstArg.specificationId).toBe("spec1"); - expect(firstArg.url).toBe("s3://letterDataBucket/letter1.pdf"); - expect(firstArg.status).toBe("PENDING"); - expect(firstArg.groupId).toBe("client1campaign1template1"); - expect(firstArg.source).toBe("/data-plane/letter-rendering/test"); - - const secondArg = (mockedDeps.letterRepo.updateLetterStatus as jest.Mock) - .mock.calls[0][0]; - expect(secondArg.id).toBe("f47ac10b-58cc-4372-a567-0e02b2c3d479"); - expect(secondArg.supplierId).toBe("supplier1"); - expect(secondArg.status).toBe("RETURNED"); - expect(secondArg.reasonCode).toBe("R07"); - expect(secondArg.reasonText).toBe("No such address"); + expect(insertedV2Letter.id).toBe("letter1"); + expect(insertedV2Letter.supplierId).toBe("supplier1"); + expect(insertedV2Letter.specificationId).toBe("spec1"); + expect(insertedV2Letter.billingRef).toBe("spec1"); + expect(insertedV2Letter.url).toBe("s3://letterDataBucket/letter1.pdf"); + expect(insertedV2Letter.status).toBe("PENDING"); + expect(insertedV2Letter.groupId).toBe("client1campaign1template1"); + expect(insertedV2Letter.source).toBe("/data-plane/letter-rendering/test"); + + const insertedV1Letter = (mockedDeps.letterRepo.putLetter as jest.Mock).mock + .calls[1][0]; + expect(insertedV1Letter.id).toBe("letter1"); + expect(insertedV1Letter.supplierId).toBe("supplier1"); + expect(insertedV1Letter.specificationId).toBe("spec1"); + expect(insertedV1Letter.billingRef).toBe("spec1"); + expect(insertedV1Letter.url).toBe("s3://letterDataBucket/letter1.pdf"); + expect(insertedV1Letter.status).toBe("PENDING"); + expect(insertedV1Letter.groupId).toBe("client1campaign1template1"); + expect(insertedV1Letter.source).toBe("/data-plane/letter-rendering/test"); + + const updatedLetter = ( + mockedDeps.letterRepo.updateLetterStatus as jest.Mock + ).mock.calls[0][0]; + expect(updatedLetter.id).toBe("f47ac10b-58cc-4372-a567-0e02b2c3d479"); + expect(updatedLetter.status).toBe("RETURNED"); + expect(updatedLetter.reasonCode).toBe("R07"); + expect(updatedLetter.reasonText).toBe("No such address"); + expect(updatedLetter.supplierId).toBe("supplier1"); + expect(mockMetrics.setNamespace).toHaveBeenCalledWith("upsertLetter"); + expect(mockMetrics.putDimensions).toHaveBeenCalledWith({ + Supplier: "supplier1", + }); + expect(mockMetrics.putMetric).toHaveBeenCalledWith( + "MessagesProcessed", + 3, + "Count", + ); }); - test("processes all v1 records successfully and returns no batch failures", async () => { + test("unknown supplier has metric emitted with 'unknown' supplier dimension", async () => { + const letterEvent = createSupplierStatusChangeEventWithoutSupplier(); + const evt: SQSEvent = createSQSEvent([ - createSqsRecord( - "msg1", - JSON.stringify(createNotification(createPreparedV1Event())), - ), - createSqsRecord( - "msg2", - JSON.stringify( - createNotification( - createPreparedV1Event({ - id: "7b9a03ca-342a-4150-b56b-989109c45614", - domainId: "letter2", - url: "s3://letterDataBucket/letter2.pdf", - }), - ), - ), - ), + createSqsRecord("unknown-supplier", JSON.stringify(letterEvent)), ]); - const result = await createUpsertLetterHandler(mockedDeps)( - evt, - {} as any, - {} as any, - ); - - expect(result).toBeDefined(); - if (!result) throw new Error("expected BatchResponse, got void"); - expect(result.batchItemFailures).toHaveLength(0); - - expect(mockedDeps.letterRepo.putLetter).toHaveBeenCalledTimes(2); + await createUpsertLetterHandler(mockedDeps)(evt, {} as any, {} as any); - const firstArg = (mockedDeps.letterRepo.putLetter as jest.Mock).mock - .calls[0][0]; - expect(firstArg.id).toBe("letter1"); - expect(firstArg.supplierId).toBe("supplier1"); - expect(firstArg.specificationId).toBe("spec1"); - expect(firstArg.url).toBe("s3://letterDataBucket/letter1.pdf"); - expect(firstArg.status).toBe("PENDING"); - expect(firstArg.groupId).toBe("client1campaign1template1"); - expect(firstArg.source).toBe("/data-plane/letter-rendering/test"); - - const secondArg = (mockedDeps.letterRepo.putLetter as jest.Mock).mock - .calls[1][0]; - expect(secondArg.id).toBe("letter2"); - expect(secondArg.supplierId).toBe("supplier1"); - expect(secondArg.specificationId).toBe("spec1"); - expect(secondArg.url).toBe("s3://letterDataBucket/letter2.pdf"); - expect(secondArg.status).toBe("PENDING"); - expect(secondArg.groupId).toBe("client1campaign1template1"); - expect(firstArg.source).toBe("/data-plane/letter-rendering/test"); + expect(mockMetrics.setNamespace).toHaveBeenCalledWith("upsertLetter"); + expect(mockMetrics.putDimensions).toHaveBeenCalledWith({ + Supplier: "unknown", + }); + expect(mockMetrics.putMetric).toHaveBeenCalledWith( + "MessagesProcessed", + 1, + "Count", + ); }); - test("invalid JSON body produces batch failure and logs error", async () => { + test("invalid JSON produces batch failure and logs error", async () => { const evt: SQSEvent = createSQSEvent([ createSqsRecord("bad-json", "this-is-not-json"), ]); @@ -330,14 +323,24 @@ describe("createUpsertLetterHandler", () => { }), ); expect(mockedDeps.letterRepo.putLetter).not.toHaveBeenCalled(); + expect(mockMetrics.setNamespace).toHaveBeenCalledWith("upsertLetter"); + expect(mockMetrics.putDimensions).toHaveBeenCalledWith({ + Supplier: "unknown", + }); + expect(mockMetrics.putMetric).toHaveBeenCalledWith( + "MessageFailed", + 1, + "Count", + ); }); - test("invalid notification schema produces batch failure and logs error", async () => { + test("invalid event type produces batch failure and logs error", async () => { + const message = { + letterEvent: { type: "unexpected type" }, + }; + const evt: SQSEvent = createSQSEvent([ - createSqsRecord( - "bad-notification-schema", - JSON.stringify({ not: "unexpected notification shape" }), - ), + createSqsRecord("bad-event-type", JSON.stringify(message)), ]); const result = await createUpsertLetterHandler(mockedDeps)( @@ -349,28 +352,24 @@ describe("createUpsertLetterHandler", () => { expect(result).toBeDefined(); if (!result) throw new Error("expected BatchResponse, got void"); expect(result.batchItemFailures).toHaveLength(1); - expect(result.batchItemFailures[0].itemIdentifier).toBe( - "bad-notification-schema", - ); - + expect(result.batchItemFailures[0].itemIdentifier).toBe("bad-event-type"); + expect(mockedDeps.letterRepo.putLetter).not.toHaveBeenCalled(); + expect(mockedDeps.letterRepo.updateLetterStatus).not.toHaveBeenCalled(); expect((mockedDeps.logger.error as jest.Mock).mock.calls[0][0]).toEqual( expect.objectContaining({ description: "Error processing upsert of record", - messageId: "bad-notification-schema", + messageId: "bad-event-type", }), ); - expect(mockedDeps.letterRepo.putLetter).not.toHaveBeenCalled(); }); test("no event type produces batch failure and logs error", async () => { + const message = { + letterEvent: { no: "type" }, + }; + const evt: SQSEvent = createSQSEvent([ - createSqsRecord( - "bad-event-type", - JSON.stringify({ - Type: "Notification", - Message: JSON.stringify({ no: "type" }), - }), - ), + createSqsRecord("bad-event-type", JSON.stringify(message)), ]); const result = await createUpsertLetterHandler(mockedDeps)( @@ -393,15 +392,13 @@ describe("createUpsertLetterHandler", () => { ); }); - test("invalid event type produces batch failure and logs error", async () => { + test("invalid event produces batch failure and logs error", async () => { + const message = { + letterEvent: { someField: "invalid" }, + supplierSpec: { supplierId: "supplier1", specId: "spec1" }, + }; const evt: SQSEvent = createSQSEvent([ - createSqsRecord( - "bad-event-type", - JSON.stringify({ - Type: "Notification", - Message: JSON.stringify({ type: "unexpected type" }), - }), - ), + createSqsRecord("bad-event", JSON.stringify(message)), ]); const result = await createUpsertLetterHandler(mockedDeps)( @@ -413,29 +410,36 @@ describe("createUpsertLetterHandler", () => { expect(result).toBeDefined(); if (!result) throw new Error("expected BatchResponse, got void"); expect(result.batchItemFailures).toHaveLength(1); - expect(result.batchItemFailures[0].itemIdentifier).toBe("bad-event-type"); + expect(result.batchItemFailures[0].itemIdentifier).toBe("bad-event"); expect(mockedDeps.letterRepo.putLetter).not.toHaveBeenCalled(); expect(mockedDeps.letterRepo.updateLetterStatus).not.toHaveBeenCalled(); expect((mockedDeps.logger.error as jest.Mock).mock.calls[0][0]).toEqual( expect.objectContaining({ description: "Error processing upsert of record", - messageId: "bad-event-type", + messageId: "bad-event", }), ); + expect(mockMetrics.setNamespace).toHaveBeenCalledWith("upsertLetter"); + expect(mockMetrics.putDimensions).toHaveBeenCalledWith({ + Supplier: "unknown", + }); + expect(mockMetrics.putMetric).toHaveBeenCalledWith( + "MessageFailed", + 1, + "Count", + ); }); test("valid event type and invalid schema produces batch failure and logs error", async () => { + const message = { + letterEvent: { + type: "uk.nhs.notify.letter-rendering.letter-request.prepared", + some: "unexpected shape", + }, + }; + const evt: SQSEvent = createSQSEvent([ - createSqsRecord( - "bad-event-schema", - JSON.stringify({ - Type: "Notification", - Message: JSON.stringify({ - type: "uk.nhs.notify.letter-rendering.letter-request.prepared", - some: "unexpected shape", - }), - }), - ), + createSqsRecord("bad-event-schema", JSON.stringify(message)), ]); const result = await createUpsertLetterHandler(mockedDeps)( @@ -463,29 +467,23 @@ describe("createUpsertLetterHandler", () => { .mockResolvedValueOnce({}) .mockRejectedValueOnce(new Error("ddb error")); + const message1 = { + letterEvent: createPreparedV2Event({ + id: "7b9a03ca-342a-4150-b56b-989109c45615", + domainId: "ok", + }), + supplierSpec: { supplierId: "supplier1", specId: "spec1" }, + }; + const message2 = { + letterEvent: createPreparedV2Event({ + id: "7b9a03ca-342a-4150-b56b-989109c45616", + domainId: "fail", + }), + supplierSpec: { supplierId: "supplier1", specId: "spec1" }, + }; const evt: SQSEvent = createSQSEvent([ - createSqsRecord( - "ok-msg", - JSON.stringify( - createNotification( - createPreparedV2Event({ - id: "7b9a03ca-342a-4150-b56b-989109c45615", - domainId: "ok", - }), - ), - ), - ), - createSqsRecord( - "fail-msg", - JSON.stringify( - createNotification( - createPreparedV2Event({ - id: "7b9a03ca-342a-4150-b56b-989109c45616", - domainId: "fail", - }), - ), - ), - ), + createSqsRecord("ok-msg", JSON.stringify(message1)), + createSqsRecord("fail-msg", JSON.stringify(message2)), ]); const result = await createUpsertLetterHandler(mockedDeps)( diff --git a/lambdas/upsert-letter/src/handler/upsert-handler.ts b/lambdas/upsert-letter/src/handler/upsert-handler.ts index 67893ff2..ada416ec 100644 --- a/lambdas/upsert-letter/src/handler/upsert-handler.ts +++ b/lambdas/upsert-letter/src/handler/upsert-handler.ts @@ -1,10 +1,4 @@ -import { - SNSMessage, - SQSBatchItemFailure, - SQSEvent, - SQSHandler, - SQSRecord, -} from "aws-lambda"; +import { SQSBatchItemFailure, SQSEvent, SQSHandler } from "aws-lambda"; import { InsertLetter, UpdateLetter } from "@internal/datastore"; import { $LetterRequestPreparedEvent, @@ -24,26 +18,46 @@ import { Deps } from "../config/deps"; type SupplierSpec = { supplierId: string; specId: string }; type PreparedEvents = LetterRequestPreparedEventV2 | LetterRequestPreparedEvent; + +const SupplierSpecSchema = z.object({ + supplierId: z.string().min(1), + specId: z.string().min(1), +}); + +const PreparedEventUnionSchema = z.discriminatedUnion("type", [ + $LetterRequestPreparedEventV2, + $LetterRequestPreparedEvent, +]); + +const QueueMessageSchema = z.union([ + $LetterEvent, + z.object({ + letterEvent: PreparedEventUnionSchema, + supplierSpec: SupplierSpecSchema, + }), +]); + +type QueueMessage = z.infer; + type UpsertOperation = { name: "Insert" | "Update"; schemas: z.ZodSchema[]; - handler: (request: unknown, deps: Deps) => Promise; + handler: ( + request: unknown, + supplierSpec: SupplierSpec, + deps: Deps, + ) => Promise; }; -// small envelope that must exist in all inputs -const TypeEnvelope = z.object({ type: z.string().min(1) }); - function getOperationFromType(type: string): UpsertOperation { - if (type.startsWith("uk.nhs.notify.letter-rendering.letter-request.prepared")) + if ( + type.startsWith("uk.nhs.notify.letter-rendering.letter-request.prepared") + ) { return { name: "Insert", schemas: [$LetterRequestPreparedEventV2, $LetterRequestPreparedEvent], - handler: async (request, deps) => { + handler: async (request, supplierSpec, deps) => { const preparedRequest = request as PreparedEvents; - const supplierSpec: SupplierSpec = resolveSupplierForVariant( - preparedRequest.data.letterVariantId, - deps, - ); const letterToInsert: InsertLetter = mapToInsertLetter( preparedRequest, supplierSpec.supplierId, @@ -60,24 +74,24 @@ function getOperationFromType(type: string): UpsertOperation { }); }, }; - if (type.startsWith("uk.nhs.notify.supplier-api.letter")) - return { - name: "Update", - schemas: [$LetterEvent], - handler: async (request, deps) => { - const supplierEvent = request as LetterEvent; - const letterToUpdate: UpdateLetter = mapToUpdateLetter(supplierEvent); - await deps.letterRepo.updateLetterStatus(letterToUpdate); + } + // if it's not an insert type, it must be an update as we've already parsed the message, but we want to have a separate operation for better logging and metrics + return { + name: "Update", + schemas: [$LetterEvent], + handler: async (request, supplierSpec, deps) => { + const supplierEvent = request as LetterEvent; + const letterToUpdate: UpdateLetter = mapToUpdateLetter(supplierEvent); + await deps.letterRepo.updateLetterStatus(letterToUpdate); - deps.logger.info({ - description: "Updated letter", - eventId: supplierEvent.id, - letterId: letterToUpdate.id, - supplierId: letterToUpdate.supplierId, - }); - }, - }; - throw new Error(`Unknown operation from type=${type}`); + deps.logger.info({ + description: "Updated letter", + eventId: supplierEvent.id, + letterId: letterToUpdate.id, + supplierId: letterToUpdate.supplierId, + }); + }, + }; } function mapToInsertLetter( @@ -117,56 +131,19 @@ function mapToUpdateLetter(upsertRequest: LetterEvent): UpdateLetter { }; } -function resolveSupplierForVariant( - variantId: string, - deps: Deps, -): SupplierSpec { - return deps.env.VARIANT_MAP[variantId]; -} - -function parseSNSNotification(record: SQSRecord) { - const notification = JSON.parse(record.body) as Partial; - if ( - notification.Type !== "Notification" || - typeof notification.Message !== "string" - ) { - throw new Error( - "SQS record does not contain SNS Notification with string Message", - ); - } - return notification.Message; -} - -function removeEventBridgeWrapper(event: any) { - const maybeEventBridge = event; - if (maybeEventBridge.source && maybeEventBridge.detail) { - return maybeEventBridge.detail; - } - return event; -} - -function getType(event: unknown) { - const env = TypeEnvelope.safeParse(event); - if (!env.success) { - throw new Error("Missing or invalid envelope.type field"); - } - return env.data.type; -} - async function runUpsert( operation: UpsertOperation, letterEvent: unknown, + supplierSpec: SupplierSpec, deps: Deps, ) { for (const schema of operation.schemas) { const r = schema.safeParse(letterEvent); if (r.success) { - await operation.handler(r.data, deps); + await operation.handler(r.data, supplierSpec, deps); return; } } - // none matched - throw new Error("No matching schema for received message"); } async function emitMetrics( @@ -191,13 +168,26 @@ async function emitMetrics( } } -function getSupplierId(snsEvent: any): string { - if (snsEvent && snsEvent.data && snsEvent.data.supplierId) { - return snsEvent.data.supplierId; +function getSupplierIdFromEvent(letterEvent: any): string { + if (letterEvent && letterEvent.data && letterEvent.data.supplierId) { + return letterEvent.data.supplierId; } return "unknown"; } +function parseQueueMessage(queueMessage: string): QueueMessage { + const result = QueueMessageSchema.safeParse(queueMessage); + + if (!result.success) { + throw new Error( + `Message did not match an expected schema: ${JSON.stringify( + result.error.issues, + )}`, + ); + } + return result.data; +} + export default function createUpsertLetterHandler(deps: Deps): SQSHandler { return metricScope((metrics: MetricsLogger) => { return async (event: SQSEvent) => { @@ -208,20 +198,46 @@ export default function createUpsertLetterHandler(deps: Deps): SQSHandler { const tasks = event.Records.map(async (record) => { let supplier = "unknown"; try { - const message: string = parseSNSNotification(record); - const snsEvent = JSON.parse(message); - supplier = getSupplierId(snsEvent); - const letterEvent: unknown = removeEventBridgeWrapper(snsEvent); + deps.logger.info({ + description: "Processing record", + messageId: record.messageId, + message: record.body, + }); + const sqsMessage = JSON.parse(record.body); + + const queueMessage: QueueMessage = parseQueueMessage(sqsMessage); + + let letterEvent: LetterEvent | PreparedEvents; + let supplierSpec: SupplierSpec | undefined; + + if ("letterEvent" in queueMessage) { + letterEvent = queueMessage.letterEvent; + supplierSpec = queueMessage.supplierSpec; + } else { + letterEvent = queueMessage; + supplierSpec = undefined; + } deps.logger.info({ description: "Extracted letter event", messageId: record.messageId, + type: letterEvent.type, + supplier: supplierSpec, }); - const type = getType(letterEvent); - const operation = getOperationFromType(type); + supplier = + !supplierSpec || !supplierSpec.supplierId + ? getSupplierIdFromEvent(letterEvent) + : supplierSpec.supplierId; + + const operation = getOperationFromType(letterEvent.type); - await runUpsert(operation, letterEvent, deps); + await runUpsert( + operation, + letterEvent, + supplierSpec ?? { supplierId: "unknown", specId: "unknown" }, + deps, + ); perSupplierSuccess.set( supplier, diff --git a/package-lock.json b/package-lock.json index 535b003c..15679de3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -490,6 +490,102 @@ "@esbuild/win32-x64": "0.27.3" } }, + "lambdas/supplier-allocator": { + "name": "nhs-notify-supplier-api-allocate-letter", + "version": "0.0.1", + "dependencies": { + "@aws-sdk/client-dynamodb": "^3.858.0", + "@aws-sdk/client-sqs": "^3.984.0", + "@aws-sdk/lib-dynamodb": "^3.858.0", + "@internal/datastore": "*", + "@internal/helpers": "^0.1.0", + "@nhsdigital/nhs-notify-event-schemas-letter-rendering": "^2.0.1", + "@nhsdigital/nhs-notify-event-schemas-letter-rendering-v1": "npm:@nhsdigital/nhs-notify-event-schemas-letter-rendering@^1.1.5", + "@nhsdigital/nhs-notify-event-schemas-supplier-api": "^1.0.8", + "@types/aws-lambda": "^8.10.148", + "aws-lambda": "^1.0.7", + "esbuild": "^0.27.2", + "pino": "^9.7.0", + "zod": "^4.1.11" + }, + "devDependencies": { + "@tsconfig/node22": "^22.0.2", + "@types/aws-lambda": "^8.10.148", + "@types/jest": "^30.0.0", + "jest": "^30.2.0", + "jest-mock-extended": "^4.0.0", + "ts-jest": "^29.4.0", + "typescript": "^5.8.3" + } + }, + "lambdas/supplier-allocator/node_modules/aws-lambda": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/aws-lambda/-/aws-lambda-1.0.7.tgz", + "integrity": "sha512-9GNFMRrEMG5y3Jvv+V4azWvc+qNWdWLTjDdhf/zgMlz8haaaLWv0xeAIWxz9PuWUBawsVxy0zZotjCdR3Xq+2w==", + "license": "MIT", + "dependencies": { + "aws-sdk": "^2.814.0", + "commander": "^3.0.2", + "js-yaml": "^3.14.1", + "watchpack": "^2.0.0-beta.10" + }, + "bin": { + "lambda": "bin/lambda" + } + }, + "lambdas/supplier-allocator/node_modules/js-yaml": { + "version": "3.14.2", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.14.2.tgz", + "integrity": "sha512-PMSmkqxr106Xa156c2M265Z+FTrPl+oxd/rgOQy2tijQeK5TxQ43psO1ZCwhVOSdnn+RzkzlRz/eY4BgJBYVpg==", + "license": "MIT", + "dependencies": { + "argparse": "^1.0.7", + "esprima": "^4.0.0" + }, + "bin": { + "js-yaml": "bin/js-yaml.js" + } + }, + "lambdas/supplier-allocator/node_modules/pino": { + "version": "9.14.0", + "resolved": "https://registry.npmjs.org/pino/-/pino-9.14.0.tgz", + "integrity": "sha512-8OEwKp5juEvb/MjpIc4hjqfgCNysrS94RIOMXYvpYCdm/jglrKEiAYmiumbmGhCvs+IcInsphYDFwqrjr7398w==", + "license": "MIT", + "dependencies": { + "@pinojs/redact": "^0.4.0", + "atomic-sleep": "^1.0.0", + "on-exit-leak-free": "^2.1.0", + "pino-abstract-transport": "^2.0.0", + "pino-std-serializers": "^7.0.0", + "process-warning": "^5.0.0", + "quick-format-unescaped": "^4.0.3", + "real-require": "^0.2.0", + "safe-stable-stringify": "^2.3.1", + "sonic-boom": "^4.0.1", + "thread-stream": "^3.0.0" + }, + "bin": { + "pino": "bin.js" + } + }, + "lambdas/supplier-allocator/node_modules/pino-abstract-transport": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/pino-abstract-transport/-/pino-abstract-transport-2.0.0.tgz", + "integrity": "sha512-F63x5tizV6WCh4R6RHyi2Ml+M70DNRXt/+HANowMflpgGFMAym/VKm6G7ZOQRjqN7XbGxK1Lg9t6ZrtzOaivMw==", + "license": "MIT", + "dependencies": { + "split2": "^4.0.0" + } + }, + "lambdas/supplier-allocator/node_modules/thread-stream": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/thread-stream/-/thread-stream-3.1.0.tgz", + "integrity": "sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A==", + "license": "MIT", + "dependencies": { + "real-require": "^0.2.0" + } + }, "lambdas/upsert-letter": { "name": "nhs-notify-supplier-api-upsert-letter", "version": "0.0.1", @@ -16632,6 +16728,10 @@ "resolved": "docs", "link": true }, + "node_modules/nhs-notify-supplier-api-allocate-letter": { + "resolved": "lambdas/supplier-allocator", + "link": true + }, "node_modules/nhs-notify-supplier-api-handler": { "resolved": "lambdas/api-handler", "link": true